moto_ipc/
sync_pipe.rs

1use core::sync::atomic::{AtomicUsize, Ordering};
2
3use moto_sys::{syscalls::SysCpu, ErrorCode, SysHandle};
4
5struct PipeBuffer {
6    buf_addr: usize,
7    work_buf_len: usize,
8    work_buf: &'static mut [u8],
9    error_code: ErrorCode,
10    ipc_handle: SysHandle,
11}
12
13impl Drop for PipeBuffer {
14    fn drop(&mut self) {
15        if self.error_code.is_ok() {
16            SysCpu::wake(self.ipc_handle).ok();
17        }
18        moto_sys::syscalls::SysCtl::put(self.ipc_handle).unwrap();
19        moto_sys::syscalls::SysMem::unmap(SysHandle::SELF, 0, u64::MAX, self.buf_addr as u64)
20            .unwrap();
21    }
22}
23
24impl PipeBuffer {
25    const CACHELINE_SIZE: usize = 64;
26    // Place reader/writer counters on their own cache lines.
27    const READER_COUNTER_OFFSET: usize = 0;
28    const WRITER_COUNTER_OFFSET: usize = Self::CACHELINE_SIZE;
29    const DATA_OFFSET: usize = Self::CACHELINE_SIZE * 2;
30
31    const VERSION_OFFSET: usize = Self::READER_COUNTER_OFFSET + 16;
32
33    unsafe fn new(buf_addr: usize, buf_size: usize, ipc_handle: SysHandle) -> Self {
34        assert!(buf_addr & (Self::CACHELINE_SIZE - 1) == 0); // Require cacheline alignment.
35        assert!(buf_size & (Self::CACHELINE_SIZE - 1) == 0); // Require cacheline alignment.
36        assert!((buf_size >> 1) + Self::DATA_OFFSET < buf_size);
37        assert!(is_power_of_two(buf_size));
38
39        assert!(Self::version(buf_addr) == 0);
40
41        let work_buf_len = buf_size >> 1;
42        PipeBuffer {
43            buf_addr,
44            work_buf_len,
45            work_buf: core::slice::from_raw_parts_mut(
46                (buf_addr + Self::DATA_OFFSET) as *mut u8,
47                work_buf_len,
48            ),
49            error_code: ErrorCode::Ok,
50            ipc_handle,
51        }
52    }
53
54    fn version(buf_addr: usize) -> u64 {
55        unsafe {
56            let addr = buf_addr + Self::VERSION_OFFSET;
57            *(addr as *const u64).as_ref().unwrap_unchecked()
58        }
59    }
60
61    fn reader_counter(&self) -> &AtomicUsize {
62        unsafe {
63            let addr = self.buf_addr + Self::READER_COUNTER_OFFSET;
64            (addr as *const AtomicUsize).as_ref().unwrap_unchecked()
65        }
66    }
67
68    fn writer_counter(&self) -> &AtomicUsize {
69        unsafe {
70            let addr = self.buf_addr + Self::WRITER_COUNTER_OFFSET;
71            (addr as *const AtomicUsize).as_ref().unwrap_unchecked()
72        }
73    }
74
75    fn assert_invariants(&self) {
76        assert!(
77            self.reader_counter().load(Ordering::Relaxed)
78                <= self.writer_counter().load(Ordering::Relaxed)
79        );
80    }
81
82    fn can_read(&self) -> bool {
83        self.reader_counter().load(Ordering::Relaxed)
84            < self.writer_counter().load(Ordering::Relaxed)
85    }
86
87    fn can_write(&self) -> bool {
88        self.writer_counter().load(Ordering::Relaxed)
89            < ((self.reader_counter().load(Ordering::Relaxed)) + self.work_buf_len)
90    }
91
92    fn write(&mut self, src: &[u8]) -> usize {
93        let reader_counter = self.reader_counter().load(Ordering::Acquire);
94        let writer_counter = self.writer_counter().load(Ordering::Relaxed);
95
96        let mut to_write = reader_counter + self.work_buf_len - writer_counter;
97
98        if to_write > src.len() {
99            to_write = src.len();
100        }
101
102        if to_write == 0 {
103            return 0;
104        }
105
106        let writer_offset = writer_counter & (self.work_buf_len - 1);
107        if (writer_offset + to_write) <= self.work_buf_len {
108            self.work_buf[writer_offset..(writer_offset + to_write)]
109                .copy_from_slice(&src[0..to_write]);
110            self.writer_counter().fetch_add(to_write, Ordering::Release);
111            return to_write;
112        }
113
114        let first_write = self.work_buf_len - writer_offset;
115        self.work_buf[writer_offset..self.work_buf_len].copy_from_slice(&src[0..first_write]);
116
117        let second_write = to_write - first_write;
118        self.work_buf[0..second_write].copy_from_slice(&src[first_write..to_write]);
119
120        self.writer_counter().fetch_add(to_write, Ordering::Release);
121        to_write
122    }
123
124    fn read(&mut self, dst: &mut [u8]) -> usize {
125        let writer_counter = self.writer_counter().load(Ordering::Acquire);
126        let reader_counter = self.reader_counter().load(Ordering::Relaxed);
127
128        let mut to_read = writer_counter - reader_counter;
129
130        if to_read > dst.len() {
131            to_read = dst.len();
132        }
133
134        if to_read == 0 {
135            return 0;
136        }
137
138        let reader_offset = reader_counter & (self.work_buf_len - 1);
139        if (reader_offset + to_read) <= self.work_buf_len {
140            (&mut *dst)[0..to_read]
141                .copy_from_slice(&self.work_buf[reader_offset..(reader_offset + to_read)]);
142            self.reader_counter().fetch_add(to_read, Ordering::Release);
143            return to_read;
144        }
145
146        let first_read = self.work_buf_len - reader_offset;
147        (&mut *dst)[0..first_read]
148            .copy_from_slice(&self.work_buf[reader_offset..self.work_buf_len]);
149
150        let second_read = to_read - first_read;
151        (&mut *dst)[first_read..to_read].copy_from_slice(&self.work_buf[0..second_read]);
152
153        self.reader_counter().fetch_add(to_read, Ordering::Release);
154        to_read
155    }
156
157    // Assuming the reader is gone, restore the unread bytes.
158    fn unwrite(&mut self) -> usize {
159        let writer_counter = self.writer_counter().load(Ordering::Acquire);
160        let reader_counter = self.reader_counter().load(Ordering::Relaxed);
161
162        if writer_counter == reader_counter {
163            return 0;
164        }
165
166        self.writer_counter()
167            .store(reader_counter, Ordering::Release);
168
169        writer_counter - reader_counter
170    }
171}
172
173pub struct Reader {
174    buffer: PipeBuffer,
175}
176
177pub struct Writer {
178    buffer: PipeBuffer,
179}
180
181const fn is_power_of_two(val: usize) -> bool {
182    (val & (val - 1)) == 0
183}
184
185impl Reader {
186    pub unsafe fn new(pipe_data: RawPipeData) -> Reader {
187        Reader {
188            buffer: PipeBuffer::new(
189                pipe_data.buf_addr,
190                pipe_data.buf_size,
191                SysHandle::from_u64(pipe_data.ipc_handle),
192            ),
193        }
194    }
195
196    pub fn read(&mut self, buf: &mut [u8]) -> Result<usize, ErrorCode> {
197        self.buffer.assert_invariants();
198        if buf.len() == 0 {
199            return Err(ErrorCode::InvalidArgument);
200        }
201
202        // Even if the remote end is gone (self.buffer.error_code.is_err()),
203        // we should complete reading bytes left in the buffer.
204        'outer: loop {
205            while !self.buffer.can_read() {
206                if self.buffer.error_code.is_err() {
207                    break 'outer;
208                }
209                if let Err(e) = SysCpu::wait(
210                    &mut [self.buffer.ipc_handle],
211                    self.buffer.ipc_handle,
212                    SysHandle::NONE,
213                    None,
214                ) {
215                    self.buffer.error_code = e;
216                    break 'outer;
217                }
218            }
219            let read = self.buffer.read(buf);
220            if read > 0 {
221                if self.buffer.error_code.is_err() {
222                    return Ok(read);
223                }
224                if let Err(e) = SysCpu::wake(self.buffer.ipc_handle) {
225                    // Cache the error.
226                    self.buffer.error_code = e;
227                }
228                return Ok(read);
229            }
230        }
231
232        // One last read: if the remote process wrote something
233        // and then exited, we don't want to lose that.
234        let read = self.buffer.read(buf);
235        if read > 0 {
236            return Ok(read);
237        }
238
239        Err(self.buffer.error_code)
240    }
241
242    pub fn total_read(&self) -> usize {
243        self.buffer.reader_counter().load(Ordering::Relaxed)
244    }
245}
246
247impl Writer {
248    pub unsafe fn new(pipe_data: RawPipeData) -> Writer {
249        Writer {
250            buffer: PipeBuffer::new(
251                pipe_data.buf_addr,
252                pipe_data.buf_size,
253                SysHandle::from_u64(pipe_data.ipc_handle),
254            ),
255        }
256    }
257
258    pub fn write(&mut self, buf: &[u8]) -> Result<usize, ErrorCode> {
259        if self.buffer.error_code.is_err() {
260            return Err(self.buffer.error_code);
261        }
262        self.buffer.assert_invariants();
263        if buf.len() == 0 {
264            return Err(ErrorCode::InvalidArgument);
265        }
266
267        let mut written = 0_usize;
268
269        loop {
270            while !self.buffer.can_write() {
271                if let Err(err) = SysCpu::wait(
272                    &mut [self.buffer.ipc_handle],
273                    self.buffer.ipc_handle,
274                    SysHandle::NONE,
275                    None,
276                ) {
277                    self.buffer.error_code = err;
278                    written = written.checked_sub(self.buffer.unwrite()).unwrap_or(0);
279                    if written > 0 {
280                        return Ok(written);
281                    } else {
282                        return Err(err);
283                    }
284                }
285            }
286
287            written += self.buffer.write(&buf[written..]);
288            if written == buf.len() {
289                if let Err(err) = SysCpu::wake(self.buffer.ipc_handle) {
290                    // Cache the error.
291                    self.buffer.error_code = err;
292                    written = written.checked_sub(self.buffer.unwrite()).unwrap_or(0);
293                    if written > 0 {
294                        return Ok(written);
295                    } else {
296                        return Err(err);
297                    }
298                }
299                return Ok(written);
300            }
301        }
302    }
303
304    pub fn total_written(&self) -> usize {
305        self.buffer.writer_counter().load(Ordering::Relaxed)
306    }
307}
308
309pub enum Pipe {
310    Reader(Reader),
311    Writer(Writer),
312    Empty,
313    Null,
314}
315
316impl Pipe {
317    pub const fn new() -> Self {
318        Self::Empty
319    }
320
321    pub const fn empty(&self) -> bool {
322        match self {
323            Self::Empty => true,
324            _ => false,
325        }
326    }
327
328    pub fn read(&mut self, buf: &mut [u8]) -> Result<usize, ErrorCode> {
329        match self {
330            Self::Reader(reader) => reader.read(buf),
331            Self::Null => Ok(0),
332            _ => Err(ErrorCode::InvalidArgument),
333        }
334    }
335
336    pub fn read_to_end(&mut self, buf: &mut alloc::vec::Vec<u8>) -> Result<usize, ErrorCode> {
337        let mut temp_vec = alloc::vec::Vec::new();
338        let mut size = 0_usize;
339        loop {
340            temp_vec.resize(256, 0_u8);
341            if let Ok(sz) = self.read(&mut temp_vec[..]) {
342                if sz == 0 {
343                    return Ok(size);
344                }
345                size += sz;
346                temp_vec.truncate(sz);
347                buf.append(&mut temp_vec);
348            } else {
349                if size != 0 {
350                    return Ok(size);
351                } else {
352                    return Err(ErrorCode::InvalidArgument);
353                }
354            }
355        }
356    }
357
358    pub fn write(&mut self, buf: &[u8]) -> Result<usize, ErrorCode> {
359        match self {
360            Self::Writer(writer) => writer.write(buf),
361            Self::Null => Ok(0),
362            _ => Err(ErrorCode::InvalidArgument),
363        }
364    }
365}
366
367pub struct RawPipeData {
368    pub buf_addr: usize,
369    pub buf_size: usize,
370    pub ipc_handle: u64,
371}
372
373impl RawPipeData {
374    // Release self (memory, handle).
375    pub unsafe fn release(self, owner_process: SysHandle) {
376        moto_sys::syscalls::SysCtl::put_remote(
377            owner_process,
378            SysHandle::from_u64(self.ipc_handle),
379        )
380        .unwrap();
381
382        moto_sys::syscalls::SysMem::unmap(owner_process, 0, u64::MAX, self.buf_addr as u64)
383            .unwrap();
384    }
385
386    pub fn unsafe_copy(&self) -> Self {
387        Self {
388            buf_addr: self.buf_addr,
389            buf_size: self.buf_size,
390            ipc_handle: self.ipc_handle,
391        }
392    }
393}
394
395// Make a simplex pipe. One of the handles must be SysHandle::Self.
396pub fn make_pair(
397    process_1: SysHandle,
398    process_2: SysHandle,
399) -> Result<(RawPipeData, RawPipeData), ErrorCode> {
400    use moto_sys::syscalls::*;
401
402    let remote_process = if process_1 == SysHandle::SELF {
403        process_2
404    } else {
405        process_1
406    };
407    let flags = SysMem::F_SHARE_SELF | SysMem::F_READABLE | SysMem::F_WRITABLE;
408    let (remote, local) = SysMem::map2(
409        remote_process,
410        flags,
411        u64::MAX,
412        u64::MAX,
413        SysMem::PAGE_SIZE_SMALL,
414        1,
415    )?;
416
417    let (h1, h2) = SysCtl::create_ipc_pair(process_1, process_2, 0).map_err(|err| {
418        SysMem::unmap(remote_process, 0, u64::MAX, remote).unwrap();
419
420        SysMem::unmap(SysHandle::SELF, 0, u64::MAX, local).unwrap();
421
422        err
423    })?;
424
425    if process_1 == SysHandle::SELF {
426        Ok((
427            RawPipeData {
428                buf_addr: local as usize,
429                buf_size: SysMem::PAGE_SIZE_SMALL as usize,
430                ipc_handle: h1.as_u64(),
431            },
432            RawPipeData {
433                buf_addr: remote as usize,
434                buf_size: SysMem::PAGE_SIZE_SMALL as usize,
435                ipc_handle: h2.as_u64(),
436            },
437        ))
438    } else {
439        Ok((
440            RawPipeData {
441                buf_addr: remote as usize,
442                buf_size: SysMem::PAGE_SIZE_SMALL as usize,
443                ipc_handle: h1.as_u64(),
444            },
445            RawPipeData {
446                buf_addr: local as usize,
447                buf_size: SysMem::PAGE_SIZE_SMALL as usize,
448                ipc_handle: h2.as_u64(),
449            },
450        ))
451    }
452}