data_portal/
lib.rs

1//! Data Portal 库
2//! 
3//! 高性能跨平台传输协议库,支持POSIX共享内存和TCP网络传输
4
5pub use crate::protocol::*;
6pub use crate::transport::*;
7
8pub mod protocol {
9    use crc32fast::Hasher;
10    
11    /// UTP协议固定32字节头部
12    #[repr(C)]
13    #[derive(Debug, Clone, Copy)]
14    pub struct PortalHeader {
15        pub magic: u32,       // 0x55545000 ("UTP\0")
16        pub version: u8,      // Protocol version
17        pub msg_type: u8,     // Message type
18        pub flags: u16,       // Control flags
19        pub payload_len: u32, // Payload length
20        pub sequence: u32,    // Sequence number
21        pub timestamp: u64,   // Timestamp
22        pub checksum: u32,    // CRC32 checksum
23        pub reserved: [u8; 4], // Reserved for future use
24    }
25    
26    impl PortalHeader {
27        pub const MAGIC: u32 = 0x55545000;
28        pub const SIZE: usize = 32;
29        
30        pub fn new(msg_type: u8, payload_len: u32, sequence: u32) -> Self {
31            let mut header = Self {
32                magic: Self::MAGIC,
33                version: 2,
34                msg_type,
35                flags: 0,
36                payload_len,
37                sequence,
38                timestamp: std::time::SystemTime::now()
39                    .duration_since(std::time::UNIX_EPOCH)
40                    .unwrap()
41                    .as_nanos() as u64,
42                checksum: 0,
43                reserved: [0; 4],
44            };
45            
46            // Calculate CRC32 checksum
47            header.checksum = header.calculate_checksum();
48            header
49        }
50        
51        pub fn to_bytes(&self) -> [u8; 32] {
52            unsafe { std::mem::transmute(*self) }
53        }
54        
55        pub fn from_bytes(bytes: &[u8; 32]) -> Self {
56            unsafe { std::mem::transmute(*bytes) }
57        }
58        
59        fn calculate_checksum(&self) -> u32 {
60            let mut hasher = Hasher::new();
61            hasher.update(&self.magic.to_le_bytes());
62            hasher.update(&[self.version]);
63            hasher.update(&[self.msg_type]);
64            hasher.update(&self.flags.to_le_bytes());
65            hasher.update(&self.payload_len.to_le_bytes());
66            hasher.update(&self.sequence.to_le_bytes());
67            hasher.update(&self.timestamp.to_le_bytes());
68            hasher.finalize()
69        }
70        
71        pub fn verify_checksum(&self) -> bool {
72            let mut temp_header = *self;
73            temp_header.checksum = 0;
74            let expected = temp_header.calculate_checksum();
75            self.checksum == expected
76        }
77    }
78}
79
80pub mod transport {
81    use std::ptr;
82    use std::slice;
83    use std::ffi::CString;
84    use anyhow::{Result, Context};
85    
86    /// POSIX共享内存传输层
87    pub struct SharedMemoryTransport {
88        name: String,
89        fd: i32,
90        ptr: *mut u8,
91        size: usize,
92    }
93    
94    impl SharedMemoryTransport {
95        pub fn new(name: &str, size: usize) -> Result<Self> {
96            let c_name = CString::new(name).context("Invalid shared memory name")?;
97            
98            // Create shared memory segment
99            let fd = unsafe {
100                libc::shm_open(
101                    c_name.as_ptr(),
102                    libc::O_CREAT | libc::O_RDWR,
103                    0o666
104                )
105            };
106            
107            if fd == -1 {
108                return Err(anyhow::anyhow!("Failed to create shared memory segment"));
109            }
110            
111            // Set size
112            unsafe {
113                if libc::ftruncate(fd, size as libc::off_t) == -1 {
114                    libc::close(fd);
115                    return Err(anyhow::anyhow!("Failed to set shared memory size"));
116                }
117            }
118            
119            // Map memory
120            let ptr = unsafe {
121                libc::mmap(
122                    ptr::null_mut(),
123                    size,
124                    libc::PROT_READ | libc::PROT_WRITE,
125                    libc::MAP_SHARED,
126                    fd,
127                    0
128                )
129            };
130            
131            if ptr == libc::MAP_FAILED {
132                unsafe { libc::close(fd); }
133                return Err(anyhow::anyhow!("Failed to map shared memory"));
134            }
135            
136            Ok(Self {
137                name: name.to_string(),
138                fd,
139                ptr: ptr as *mut u8,
140                size,
141            })
142        }
143        
144        /// 零拷贝写入数据
145        pub unsafe fn write_zero_copy(&self, data: &[u8], offset: usize) -> Result<()> {
146            if offset + data.len() > self.size {
147                return Err(anyhow::anyhow!("Write would exceed shared memory bounds"));
148            }
149            
150            ptr::copy_nonoverlapping(
151                data.as_ptr(),
152                self.ptr.add(offset),
153                data.len()
154            );
155            
156            Ok(())
157        }
158        
159        /// 零拷贝读取数据
160        pub unsafe fn read_zero_copy(&self, offset: usize, len: usize) -> Result<&[u8]> {
161            if offset + len > self.size {
162                return Err(anyhow::anyhow!("Read would exceed shared memory bounds"));
163            }
164            
165            Ok(slice::from_raw_parts(self.ptr.add(offset), len))
166        }
167        
168        /// 获取原始指针(用于直接内存操作)
169        pub fn as_ptr(&self) -> *mut u8 {
170            self.ptr
171        }
172        
173        pub fn size(&self) -> usize {
174            self.size
175        }
176    }
177    
178    impl Drop for SharedMemoryTransport {
179        fn drop(&mut self) {
180            unsafe {
181                libc::munmap(self.ptr as *mut libc::c_void, self.size);
182                libc::close(self.fd);
183                
184                let c_name = CString::new(self.name.clone()).unwrap();
185                libc::shm_unlink(c_name.as_ptr());
186            }
187        }
188    }
189    
190    unsafe impl Send for SharedMemoryTransport {}
191    unsafe impl Sync for SharedMemoryTransport {}
192}