1use core::sync::atomic::{AtomicUsize, Ordering};
2
3use moto_sys::{syscalls::SysCpu, ErrorCode, SysHandle};
4
5struct PipeBuffer {
6 buf_addr: usize,
7 work_buf_len: usize,
8 work_buf: &'static mut [u8],
9 error_code: ErrorCode,
10 ipc_handle: SysHandle,
11}
12
13impl Drop for PipeBuffer {
14 fn drop(&mut self) {
15 if self.error_code.is_ok() {
16 SysCpu::wake(self.ipc_handle).ok();
17 }
18 moto_sys::syscalls::SysCtl::put(self.ipc_handle).unwrap();
19 moto_sys::syscalls::SysMem::unmap(SysHandle::SELF, 0, u64::MAX, self.buf_addr as u64)
20 .unwrap();
21 }
22}
23
24impl PipeBuffer {
25 const CACHELINE_SIZE: usize = 64;
26 const READER_COUNTER_OFFSET: usize = 0;
28 const WRITER_COUNTER_OFFSET: usize = Self::CACHELINE_SIZE;
29 const DATA_OFFSET: usize = Self::CACHELINE_SIZE * 2;
30
31 const VERSION_OFFSET: usize = Self::READER_COUNTER_OFFSET + 16;
32
33 unsafe fn new(buf_addr: usize, buf_size: usize, ipc_handle: SysHandle) -> Self {
34 assert!(buf_addr & (Self::CACHELINE_SIZE - 1) == 0); assert!(buf_size & (Self::CACHELINE_SIZE - 1) == 0); assert!((buf_size >> 1) + Self::DATA_OFFSET < buf_size);
37 assert!(is_power_of_two(buf_size));
38
39 assert!(Self::version(buf_addr) == 0);
40
41 let work_buf_len = buf_size >> 1;
42 PipeBuffer {
43 buf_addr,
44 work_buf_len,
45 work_buf: core::slice::from_raw_parts_mut(
46 (buf_addr + Self::DATA_OFFSET) as *mut u8,
47 work_buf_len,
48 ),
49 error_code: ErrorCode::Ok,
50 ipc_handle,
51 }
52 }
53
54 fn version(buf_addr: usize) -> u64 {
55 unsafe {
56 let addr = buf_addr + Self::VERSION_OFFSET;
57 *(addr as *const u64).as_ref().unwrap_unchecked()
58 }
59 }
60
61 fn reader_counter(&self) -> &AtomicUsize {
62 unsafe {
63 let addr = self.buf_addr + Self::READER_COUNTER_OFFSET;
64 (addr as *const AtomicUsize).as_ref().unwrap_unchecked()
65 }
66 }
67
68 fn writer_counter(&self) -> &AtomicUsize {
69 unsafe {
70 let addr = self.buf_addr + Self::WRITER_COUNTER_OFFSET;
71 (addr as *const AtomicUsize).as_ref().unwrap_unchecked()
72 }
73 }
74
75 fn assert_invariants(&self) {
76 assert!(
77 self.reader_counter().load(Ordering::Relaxed)
78 <= self.writer_counter().load(Ordering::Relaxed)
79 );
80 }
81
82 fn can_read(&self) -> bool {
83 self.reader_counter().load(Ordering::Relaxed)
84 < self.writer_counter().load(Ordering::Relaxed)
85 }
86
87 fn can_write(&self) -> bool {
88 self.writer_counter().load(Ordering::Relaxed)
89 < ((self.reader_counter().load(Ordering::Relaxed)) + self.work_buf_len)
90 }
91
92 fn write(&mut self, src: &[u8]) -> usize {
93 let reader_counter = self.reader_counter().load(Ordering::Acquire);
94 let writer_counter = self.writer_counter().load(Ordering::Relaxed);
95
96 let mut to_write = reader_counter + self.work_buf_len - writer_counter;
97
98 if to_write > src.len() {
99 to_write = src.len();
100 }
101
102 if to_write == 0 {
103 return 0;
104 }
105
106 let writer_offset = writer_counter & (self.work_buf_len - 1);
107 if (writer_offset + to_write) <= self.work_buf_len {
108 self.work_buf[writer_offset..(writer_offset + to_write)]
109 .copy_from_slice(&src[0..to_write]);
110 self.writer_counter().fetch_add(to_write, Ordering::Release);
111 return to_write;
112 }
113
114 let first_write = self.work_buf_len - writer_offset;
115 self.work_buf[writer_offset..self.work_buf_len].copy_from_slice(&src[0..first_write]);
116
117 let second_write = to_write - first_write;
118 self.work_buf[0..second_write].copy_from_slice(&src[first_write..to_write]);
119
120 self.writer_counter().fetch_add(to_write, Ordering::Release);
121 to_write
122 }
123
124 fn read(&mut self, dst: &mut [u8]) -> usize {
125 let writer_counter = self.writer_counter().load(Ordering::Acquire);
126 let reader_counter = self.reader_counter().load(Ordering::Relaxed);
127
128 let mut to_read = writer_counter - reader_counter;
129
130 if to_read > dst.len() {
131 to_read = dst.len();
132 }
133
134 if to_read == 0 {
135 return 0;
136 }
137
138 let reader_offset = reader_counter & (self.work_buf_len - 1);
139 if (reader_offset + to_read) <= self.work_buf_len {
140 (&mut *dst)[0..to_read]
141 .copy_from_slice(&self.work_buf[reader_offset..(reader_offset + to_read)]);
142 self.reader_counter().fetch_add(to_read, Ordering::Release);
143 return to_read;
144 }
145
146 let first_read = self.work_buf_len - reader_offset;
147 (&mut *dst)[0..first_read]
148 .copy_from_slice(&self.work_buf[reader_offset..self.work_buf_len]);
149
150 let second_read = to_read - first_read;
151 (&mut *dst)[first_read..to_read].copy_from_slice(&self.work_buf[0..second_read]);
152
153 self.reader_counter().fetch_add(to_read, Ordering::Release);
154 to_read
155 }
156
157 fn unwrite(&mut self) -> usize {
159 let writer_counter = self.writer_counter().load(Ordering::Acquire);
160 let reader_counter = self.reader_counter().load(Ordering::Relaxed);
161
162 if writer_counter == reader_counter {
163 return 0;
164 }
165
166 self.writer_counter()
167 .store(reader_counter, Ordering::Release);
168
169 writer_counter - reader_counter
170 }
171}
172
173pub struct Reader {
174 buffer: PipeBuffer,
175}
176
177pub struct Writer {
178 buffer: PipeBuffer,
179}
180
181const fn is_power_of_two(val: usize) -> bool {
182 (val & (val - 1)) == 0
183}
184
185impl Reader {
186 pub unsafe fn new(pipe_data: RawPipeData) -> Reader {
187 Reader {
188 buffer: PipeBuffer::new(
189 pipe_data.buf_addr,
190 pipe_data.buf_size,
191 SysHandle::from_u64(pipe_data.ipc_handle),
192 ),
193 }
194 }
195
196 pub fn read(&mut self, buf: &mut [u8]) -> Result<usize, ErrorCode> {
197 self.buffer.assert_invariants();
198 if buf.len() == 0 {
199 return Err(ErrorCode::InvalidArgument);
200 }
201
202 'outer: loop {
205 while !self.buffer.can_read() {
206 if self.buffer.error_code.is_err() {
207 break 'outer;
208 }
209 if let Err(e) = SysCpu::wait(
210 &mut [self.buffer.ipc_handle],
211 self.buffer.ipc_handle,
212 SysHandle::NONE,
213 None,
214 ) {
215 self.buffer.error_code = e;
216 break 'outer;
217 }
218 }
219 let read = self.buffer.read(buf);
220 if read > 0 {
221 if self.buffer.error_code.is_err() {
222 return Ok(read);
223 }
224 if let Err(e) = SysCpu::wake(self.buffer.ipc_handle) {
225 self.buffer.error_code = e;
227 }
228 return Ok(read);
229 }
230 }
231
232 let read = self.buffer.read(buf);
235 if read > 0 {
236 return Ok(read);
237 }
238
239 Err(self.buffer.error_code)
240 }
241
242 pub fn total_read(&self) -> usize {
243 self.buffer.reader_counter().load(Ordering::Relaxed)
244 }
245}
246
247impl Writer {
248 pub unsafe fn new(pipe_data: RawPipeData) -> Writer {
249 Writer {
250 buffer: PipeBuffer::new(
251 pipe_data.buf_addr,
252 pipe_data.buf_size,
253 SysHandle::from_u64(pipe_data.ipc_handle),
254 ),
255 }
256 }
257
258 pub fn write(&mut self, buf: &[u8]) -> Result<usize, ErrorCode> {
259 if self.buffer.error_code.is_err() {
260 return Err(self.buffer.error_code);
261 }
262 self.buffer.assert_invariants();
263 if buf.len() == 0 {
264 return Err(ErrorCode::InvalidArgument);
265 }
266
267 let mut written = 0_usize;
268
269 loop {
270 while !self.buffer.can_write() {
271 if let Err(err) = SysCpu::wait(
272 &mut [self.buffer.ipc_handle],
273 self.buffer.ipc_handle,
274 SysHandle::NONE,
275 None,
276 ) {
277 self.buffer.error_code = err;
278 written = written.checked_sub(self.buffer.unwrite()).unwrap_or(0);
279 if written > 0 {
280 return Ok(written);
281 } else {
282 return Err(err);
283 }
284 }
285 }
286
287 written += self.buffer.write(&buf[written..]);
288 if written == buf.len() {
289 if let Err(err) = SysCpu::wake(self.buffer.ipc_handle) {
290 self.buffer.error_code = err;
292 written = written.checked_sub(self.buffer.unwrite()).unwrap_or(0);
293 if written > 0 {
294 return Ok(written);
295 } else {
296 return Err(err);
297 }
298 }
299 return Ok(written);
300 }
301 }
302 }
303
304 pub fn total_written(&self) -> usize {
305 self.buffer.writer_counter().load(Ordering::Relaxed)
306 }
307}
308
309pub enum Pipe {
310 Reader(Reader),
311 Writer(Writer),
312 Empty,
313 Null,
314}
315
316impl Pipe {
317 pub const fn new() -> Self {
318 Self::Empty
319 }
320
321 pub const fn empty(&self) -> bool {
322 match self {
323 Self::Empty => true,
324 _ => false,
325 }
326 }
327
328 pub fn read(&mut self, buf: &mut [u8]) -> Result<usize, ErrorCode> {
329 match self {
330 Self::Reader(reader) => reader.read(buf),
331 Self::Null => Ok(0),
332 _ => Err(ErrorCode::InvalidArgument),
333 }
334 }
335
336 pub fn read_to_end(&mut self, buf: &mut alloc::vec::Vec<u8>) -> Result<usize, ErrorCode> {
337 let mut temp_vec = alloc::vec::Vec::new();
338 let mut size = 0_usize;
339 loop {
340 temp_vec.resize(256, 0_u8);
341 if let Ok(sz) = self.read(&mut temp_vec[..]) {
342 if sz == 0 {
343 return Ok(size);
344 }
345 size += sz;
346 temp_vec.truncate(sz);
347 buf.append(&mut temp_vec);
348 } else {
349 if size != 0 {
350 return Ok(size);
351 } else {
352 return Err(ErrorCode::InvalidArgument);
353 }
354 }
355 }
356 }
357
358 pub fn write(&mut self, buf: &[u8]) -> Result<usize, ErrorCode> {
359 match self {
360 Self::Writer(writer) => writer.write(buf),
361 Self::Null => Ok(0),
362 _ => Err(ErrorCode::InvalidArgument),
363 }
364 }
365}
366
367pub struct RawPipeData {
368 pub buf_addr: usize,
369 pub buf_size: usize,
370 pub ipc_handle: u64,
371}
372
373impl RawPipeData {
374 pub unsafe fn release(self, owner_process: SysHandle) {
376 moto_sys::syscalls::SysCtl::put_remote(
377 owner_process,
378 SysHandle::from_u64(self.ipc_handle),
379 )
380 .unwrap();
381
382 moto_sys::syscalls::SysMem::unmap(owner_process, 0, u64::MAX, self.buf_addr as u64)
383 .unwrap();
384 }
385
386 pub fn unsafe_copy(&self) -> Self {
387 Self {
388 buf_addr: self.buf_addr,
389 buf_size: self.buf_size,
390 ipc_handle: self.ipc_handle,
391 }
392 }
393}
394
395pub fn make_pair(
397 process_1: SysHandle,
398 process_2: SysHandle,
399) -> Result<(RawPipeData, RawPipeData), ErrorCode> {
400 use moto_sys::syscalls::*;
401
402 let remote_process = if process_1 == SysHandle::SELF {
403 process_2
404 } else {
405 process_1
406 };
407 let flags = SysMem::F_SHARE_SELF | SysMem::F_READABLE | SysMem::F_WRITABLE;
408 let (remote, local) = SysMem::map2(
409 remote_process,
410 flags,
411 u64::MAX,
412 u64::MAX,
413 SysMem::PAGE_SIZE_SMALL,
414 1,
415 )?;
416
417 let (h1, h2) = SysCtl::create_ipc_pair(process_1, process_2, 0).map_err(|err| {
418 SysMem::unmap(remote_process, 0, u64::MAX, remote).unwrap();
419
420 SysMem::unmap(SysHandle::SELF, 0, u64::MAX, local).unwrap();
421
422 err
423 })?;
424
425 if process_1 == SysHandle::SELF {
426 Ok((
427 RawPipeData {
428 buf_addr: local as usize,
429 buf_size: SysMem::PAGE_SIZE_SMALL as usize,
430 ipc_handle: h1.as_u64(),
431 },
432 RawPipeData {
433 buf_addr: remote as usize,
434 buf_size: SysMem::PAGE_SIZE_SMALL as usize,
435 ipc_handle: h2.as_u64(),
436 },
437 ))
438 } else {
439 Ok((
440 RawPipeData {
441 buf_addr: remote as usize,
442 buf_size: SysMem::PAGE_SIZE_SMALL as usize,
443 ipc_handle: h1.as_u64(),
444 },
445 RawPipeData {
446 buf_addr: local as usize,
447 buf_size: SysMem::PAGE_SIZE_SMALL as usize,
448 ipc_handle: h2.as_u64(),
449 },
450 ))
451 }
452}