cyfs_chunk_lib/
shared_mem_chunk.rs1use std::io::SeekFrom;
2use std::ops::{Deref, DerefMut};
3use shared_memory::{Shmem, ShmemConf, ShmemError};
4use cyfs_base::*;
5use crate::{Chunk, ChunkMeta, ChunkMut};
6
7pub struct SharedMemChunk {
8 unique_id: String,
9 shmem: Shmem,
10 cur_pos: usize,
11 capacity: usize,
12 data_len: usize,
13}
14
15unsafe impl Sync for SharedMemChunk {
16
17}
18
19unsafe impl Send for SharedMemChunk {
20
21}
22
23impl SharedMemChunk {
24 pub fn new(capacity: usize, data_len: usize, unique_id: &str) -> BuckyResult<Self> {
25 log::debug!("new shared mem chunk: capacity={}, data_len={}, unique_id={}", capacity, data_len, unique_id);
26
27 match ShmemConf::new().size(capacity).os_id(unique_id).create() {
28 Ok(shmem) => Ok(Self {
29 unique_id: unique_id.to_string(),
30 shmem,
31 cur_pos: 0,
32 capacity,
33 data_len
34 }),
35 Err(ShmemError::MappingIdExists) => {
36 match ShmemConf::new().size(capacity).os_id(unique_id).open() {
37 Ok(shmem) => Ok(Self {
38 unique_id: unique_id.to_string(),
39 shmem,
40 cur_pos: 0,
41 capacity,
42 data_len
43 }),
44 Err(e) => {
45 let msg = format!("open shared mem {} failed. {}", unique_id, e);
46 log::error!("{}", msg.as_str());
47 Err(BuckyError::new(BuckyErrorCode::Failed, msg))
48 }
49 }
50 },
51 Err(e) => {
52 let msg = format!("create shared mem {} failed. {}", unique_id, e);
53 log::error!("{}", msg.as_str());
54 Err(BuckyError::new(BuckyErrorCode::Failed, msg))
55 }
56 }
57 }
58}
59
60#[async_trait::async_trait]
61impl Chunk for SharedMemChunk {
62 fn get_chunk_meta(&self) -> ChunkMeta {
63 ChunkMeta::SharedMemChunk(self.unique_id.clone(), self.capacity as u32, self.data_len as u32)
64 }
65
66 fn get_len(&self) -> usize {
67 self.data_len
68 }
69
70 fn into_vec(self: Box<Self>) -> Vec<u8> {
71 unsafe {
72 self.shmem.as_slice().to_vec()
73 }
74 }
75
76 async fn read(&mut self, buf: &mut [u8]) -> BuckyResult<usize> {
77 let this = self;
80 let shmem = &this.shmem;
81 if this.cur_pos >= this.data_len {
82 Ok(0)
83 } else if buf.len() > this.data_len - this.cur_pos {
84 let read_len = this.data_len - this.cur_pos;
85 unsafe {std::ptr::copy::<u8>(shmem.as_slice()[this.cur_pos..].as_ptr(), buf.as_mut_ptr(), read_len)};
86 this.cur_pos = this.data_len;
87 Ok(read_len)
88 } else {
89 let read_len = buf.len();
90 unsafe {std::ptr::copy::<u8>(shmem.as_slice()[this.cur_pos..this.cur_pos + read_len].as_ptr(), buf.as_mut_ptr(), read_len)};
91 this.cur_pos += read_len;
92 Ok(read_len)
93 }
94 }
95
96 async fn seek(&mut self, pos: SeekFrom) -> BuckyResult<u64> {
97 let this = self;
100 match pos {
101 SeekFrom::Start(pos) => {
102 this.cur_pos = pos as usize;
103 Ok(pos)
104 },
105 SeekFrom::End(pos) => {
106 if this.data_len as i64 + pos < 0 {
107 return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
108 }
109 this.cur_pos = (this.data_len as i64 + pos) as usize;
110 Ok(this.cur_pos as u64)
111 },
112 SeekFrom::Current(pos) => {
113 if this.cur_pos as i64 + pos < 0 {
114 return Err(BuckyError::new(BuckyErrorCode::Failed, format!("seek failed")));
115 }
116 this.cur_pos = (this.cur_pos as i64 + pos) as usize;
117 Ok(this.cur_pos as u64)
118 }
119 }
120 }
121}
122
123impl Deref for SharedMemChunk {
124 type Target = [u8];
125
126 fn deref(&self) -> &Self::Target {
127 unsafe {
128 &self.shmem.as_slice()[..self.capacity]
129 }
130 }
131}
132
133#[async_trait::async_trait]
134impl ChunkMut for SharedMemChunk {
135 async fn reset(&mut self) -> BuckyResult<()> {
136 self.cur_pos = 0;
137 self.data_len = 0;
138 Ok(())
139 }
140
141 async fn write(&mut self, buf: &[u8]) -> BuckyResult<usize> {
142 let this = self;
143 let shmem = &this.shmem;
144 unsafe {
145 if this.cur_pos + buf.len() >= shmem.len() {
146 let write_size = shmem.len() - this.cur_pos;
147 std::ptr::copy(buf.as_ptr(), shmem.as_ptr().offset(this.cur_pos as isize), write_size);
148 this.cur_pos = shmem.len();
149 if this.cur_pos > this.data_len {
150 this.data_len = this.cur_pos;
151 }
152 Ok(write_size)
153 } else {
154 std::ptr::copy(buf.as_ptr(), shmem.as_ptr().offset(this.cur_pos as isize), buf.len());
155 this.cur_pos += buf.len();
156 if this.cur_pos > this.data_len {
157 this.data_len = this.cur_pos;
158 }
159 Ok(buf.len())
160 }
161 }
162 }
163
164 async fn flush(&mut self) -> BuckyResult<()> {
165 Ok(())
166 }
167}
168
169#[cfg(not(target_os = "android"))]
170impl DerefMut for SharedMemChunk {
171 fn deref_mut(&mut self) -> &mut Self::Target {
172 unsafe {
173 &mut self.shmem.as_slice_mut()[..self.capacity]
174 }
175 }
176}