cyfs_chunk_lib/
shared_mem_chunk.rs

1use std::io::SeekFrom;
2use std::ops::{Deref, DerefMut};
3use shared_memory::{Shmem, ShmemConf, ShmemError};
4use cyfs_base::*;
5use crate::{Chunk, ChunkMeta, ChunkMut};
6
7pub struct SharedMemChunk {
8    unique_id: String,
9    shmem: Shmem,
10    cur_pos: usize,
11    capacity: usize,
12    data_len: usize,
13}
14
15unsafe impl Sync for SharedMemChunk {
16
17}
18
19unsafe impl Send for SharedMemChunk {
20
21}
22
23impl SharedMemChunk {
24    pub fn new(capacity: usize, data_len: usize, unique_id: &str) -> BuckyResult<Self> {
25        log::debug!("new shared mem chunk: capacity={}, data_len={}, unique_id={}", capacity, data_len, unique_id);
26
27        match ShmemConf::new().size(capacity).os_id(unique_id).create() {
28            Ok(shmem) => Ok(Self {
29                unique_id: unique_id.to_string(),
30                shmem,
31                cur_pos: 0,
32                capacity,
33                data_len
34            }),
35            Err(ShmemError::MappingIdExists) => {
36                match ShmemConf::new().size(capacity).os_id(unique_id).open() {
37                    Ok(shmem) => Ok(Self {
38                        unique_id: unique_id.to_string(),
39                        shmem,
40                        cur_pos: 0,
41                        capacity,
42                        data_len
43                    }),
44                    Err(e) => {
45                        let msg = format!("open shared mem {} failed. {}", unique_id, e);
46                        log::error!("{}", msg.as_str());
47                        Err(BuckyError::new(BuckyErrorCode::Failed, msg))
48                    }
49                }
50            },
51            Err(e) => {
52                let msg = format!("create shared mem {} failed. {}", unique_id, e);
53                log::error!("{}", msg.as_str());
54                Err(BuckyError::new(BuckyErrorCode::Failed, msg))
55            }
56        }
57    }
58}
59
60#[async_trait::async_trait]
61impl Chunk for SharedMemChunk {
62    fn get_chunk_meta(&self) -> ChunkMeta {
63        ChunkMeta::SharedMemChunk(self.unique_id.clone(), self.capacity as u32, self.data_len as u32)
64    }
65
66    fn get_len(&self) -> usize {
67        self.data_len
68    }
69
70    fn into_vec(self: Box<Self>) -> Vec<u8> {
71        unsafe {
72            self.shmem.as_slice().to_vec()
73        }
74    }
75
76    async fn read(&mut self, buf: &mut [u8]) -> BuckyResult<usize> {
77        // log::debug!("SharedMemChunk read: buf={}, cur_pos={}, data_len={}", buf.len(), self.cur_pos, self.data_len);
78
79        let this = self;
80        let shmem = &this.shmem;
81        if this.cur_pos >= this.data_len {
82            Ok(0)
83        } else if buf.len() > this.data_len - this.cur_pos {
84            let read_len = this.data_len - this.cur_pos;
85            unsafe {std::ptr::copy::<u8>(shmem.as_slice()[this.cur_pos..].as_ptr(), buf.as_mut_ptr(), read_len)};
86            this.cur_pos = this.data_len;
87            Ok(read_len)
88        } else {
89            let read_len = buf.len();
90            unsafe {std::ptr::copy::<u8>(shmem.as_slice()[this.cur_pos..this.cur_pos + read_len].as_ptr(), buf.as_mut_ptr(), read_len)};
91            this.cur_pos += read_len;
92            Ok(read_len)
93        }
94    }
95
96    async fn seek(&mut self, pos: SeekFrom) -> BuckyResult<u64> {
97        // log::debug!("SharedMemChunk seek: pos={:?}, cur_pos={}, data_len={}", pos, self.cur_pos, self.data_len);
98
99        let this = self;
100        match pos {
101            SeekFrom::Start(pos) => {
102                this.cur_pos = pos as usize;
103                Ok(pos)
104            },
105            SeekFrom::End(pos) => {
106                if this.data_len as i64 + pos < 0 {
107                    return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
108                }
109                this.cur_pos = (this.data_len as i64 + pos) as usize;
110                Ok(this.cur_pos as u64)
111            },
112            SeekFrom::Current(pos) => {
113                if this.cur_pos as i64 + pos < 0 {
114                    return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
115                }
116                this.cur_pos = (this.cur_pos as i64 + pos) as usize;
117                Ok(this.cur_pos as u64)
118            }
119        }
120    }
121}
122
123impl Deref for SharedMemChunk {
124    type Target = [u8];
125
126    fn deref(&self) -> &Self::Target {
127        unsafe {
128            &self.shmem.as_slice()[..self.capacity]
129        }
130    }
131}
132
133#[async_trait::async_trait]
134impl ChunkMut for SharedMemChunk {
135    async fn reset(&mut self) -> BuckyResult<()> {
136        self.cur_pos = 0;
137        self.data_len = 0;
138        Ok(())
139    }
140
141    async fn write(&mut self, buf: &[u8]) -> BuckyResult<usize> {
142        let this = self;
143        let shmem = &this.shmem;
144        unsafe {
145            if this.cur_pos + buf.len() >= shmem.len() {
146                let write_size = shmem.len() - this.cur_pos;
147                std::ptr::copy(buf.as_ptr(), shmem.as_ptr().offset(this.cur_pos as isize), write_size);
148                this.cur_pos = shmem.len();
149                if this.cur_pos > this.data_len {
150                    this.data_len = this.cur_pos;
151                }
152                Ok(write_size)
153            } else {
154                std::ptr::copy(buf.as_ptr(), shmem.as_ptr().offset(this.cur_pos as isize), buf.len());
155                this.cur_pos += buf.len();
156                if this.cur_pos > this.data_len {
157                    this.data_len = this.cur_pos;
158                }
159                Ok(buf.len())
160            }
161        }
162    }
163
164    async fn flush(&mut self) -> BuckyResult<()> {
165        Ok(())
166    }
167}
168
169#[cfg(not(target_os = "android"))]
170impl DerefMut for SharedMemChunk {
171    fn deref_mut(&mut self) -> &mut Self::Target {
172        unsafe {
173            &mut self.shmem.as_slice_mut()[..self.capacity]
174        }
175    }
176}