rust2go_mem_ffi/
lib.rs

1mod future;
2mod utils;
3
4use std::io;
5
6pub use future::*;
7pub use mem_ring::{Queue, QueueMeta, ReadQueue, WriteQueue};
8pub use slab::Slab;
9pub use utils::*;
10
11pub type TaskHandler = fn(usize, TaskDesc) -> bool;
12
13#[repr(C)]
14#[derive(Debug, Clone, Copy)]
15pub struct Payload {
16    // Request parameters or Response parameters ptr.
17    // For multiple parameters, these parameters should be put contiguously in memory.
18    pub ptr: usize,
19    // For response, user_data should be equal to request's user_data.
20    // For drop ack, user_data should be equal to response's next_user_data.
21    pub user_data: usize,
22    // Use for combined response and drop ack.
23    pub next_user_data: usize,
24    // Each call with different signature should have a unique call_id.
25    pub call_id: u32,
26    // last bit: 1=contain request
27    // last second bit: 1=contain response
28    // last third bit: 1=want peer reply
29    // last 4th bit: 1=can drop last payload
30    // so:
31    // 1. 0b0101=call
32    // 2. 0b1110=response to normal call
33    // 3. 0b1000=only drop(for response)
34    // For a oneway call: send 1, recv 3
35    // For a normal call: send 1, recv 2, send 3
36    // last 5th bit: want peer quit
37    // so:
38    // 1. 0b10100=notify peer to quit and wait peer quit reply
39    // 2. 0b10000=notify peer to quit
40    // For a quit call: send 1, recv 2
41    pub flag: u32,
42}
43
44impl Payload {
45    const CALL: u32 = 0b0101;
46    const REPLY: u32 = 0b1110;
47    const DROP: u32 = 0b1000;
48    const QUIT_INIT: u32 = 0b10100;
49    const QUIT_ACK: u32 = 0b10000;
50
51    #[inline]
52    pub const fn new_call(call_id: u32, user_data: usize, ptr: usize) -> Self {
53        Self {
54            ptr,
55            user_data,
56            next_user_data: 0,
57            call_id,
58            flag: Self::CALL,
59        }
60    }
61
62    #[inline]
63    pub fn new_reply(call_id: u32, user_data: usize, next_user_data: usize, ptr: usize) -> Self {
64        Self {
65            ptr,
66            user_data,
67            next_user_data,
68            call_id,
69            flag: Self::REPLY,
70        }
71    }
72
73    #[inline]
74    pub fn new_drop(call_id: u32, user_data: usize) -> Self {
75        Self {
76            ptr: 0,
77            user_data,
78            next_user_data: 0,
79            call_id,
80            flag: Self::DROP,
81        }
82    }
83
84    #[inline]
85    pub const fn new_quit_init() -> Self {
86        Self {
87            ptr: 0,
88            user_data: 0,
89            next_user_data: 0,
90            call_id: 0,
91            flag: Self::QUIT_INIT,
92        }
93    }
94
95    #[inline]
96    pub const fn new_quit_ack() -> Self {
97        Self {
98            ptr: 0,
99            user_data: 0,
100            next_user_data: 0,
101            call_id: 0,
102            flag: Self::QUIT_ACK,
103        }
104    }
105}
106
107pub struct TaskDesc {
108    pub buf: Vec<u8>,
109    pub params_ptr: usize,
110    pub slot_ptr: usize,
111}
112
113/// # Safety
114/// peer_init_function_pointer must be a valid function.
115// Must be called for each thread.
116pub unsafe fn init_mem_ffi<const N: usize>(
117    peer_init_function_pointer: *const (),
118    size: usize,
119    handlers: [TaskHandler; N],
120) -> (WriteQueue<Payload>, SharedSlab) {
121    let (read_queue, write_queue) =
122        init_rings(peer_init_function_pointer, size).expect("unable to init ring");
123
124    #[cfg(all(feature = "tokio", not(feature = "monoio")))]
125    let shared_slab = std::sync::Arc::new(std::sync::Mutex::new(Slab::new()));
126    #[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
127    let shared_slab = std::rc::Rc::new(std::cell::UnsafeCell::new(Slab::new()));
128
129    let wq = write_queue.clone();
130    let sb = shared_slab.clone();
131    let guard = read_queue
132        .run_handler(move |payload: Payload| {
133            if payload.flag & Payload::QUIT_ACK == Payload::QUIT_ACK {
134                return;
135            }
136            let Some(call_handle) = handlers.get(payload.call_id as usize) else {
137                panic!("call handler {} not found", payload.call_id);
138            };
139            let sid = payload.user_data;
140            let desc = {
141                #[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
142                let locked = unsafe { &mut *sb.get() };
143                #[cfg(all(feature = "tokio", not(feature = "monoio")))]
144                let mut locked = sb.lock().unwrap();
145                locked.remove(sid)
146            };
147
148            if call_handle(payload.ptr, desc) {
149                let drop_payload = Payload::new_drop(payload.call_id, payload.next_user_data);
150                wq.push(drop_payload);
151            }
152        })
153        .expect("unable to run ffi handler");
154    Box::leak(Box::new(guard));
155    (write_queue, shared_slab)
156}
157
158/// # Safety
159/// peer_init_function_pointer must be a valid function.
160#[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
161pub unsafe fn init_rings<T: 'static>(
162    peer_init_function_pointer: *const (),
163    size: usize,
164) -> Result<(ReadQueue<T>, WriteQueue<T>), io::Error> {
165    type RingInitFunc = unsafe extern "C" fn(QueueMeta, QueueMeta);
166    let (rqueue, rmeta) = Queue::new(size)?;
167    let (wqueue, wmeta) = Queue::new(size)?;
168
169    let init_func: RingInitFunc = std::mem::transmute(peer_init_function_pointer);
170    init_func(rmeta, wmeta);
171
172    Ok((rqueue.read(), wqueue.write()?))
173}
174
175/// # Safety
176/// peer_init_function_pointer must be a valid function.
177#[cfg(all(feature = "tokio", not(feature = "monoio")))]
178pub unsafe fn init_rings<T: 'static + Send>(
179    peer_init_function_pointer: *const (),
180    size: usize,
181) -> Result<(ReadQueue<T>, WriteQueue<T>), io::Error> {
182    type RingInitFunc = unsafe extern "C" fn(QueueMeta, QueueMeta);
183    let (rqueue, rmeta) = Queue::new(size)?;
184    let (wqueue, wmeta) = Queue::new(size)?;
185
186    let init_func: RingInitFunc = std::mem::transmute(peer_init_function_pointer);
187    init_func(rmeta, wmeta);
188
189    Ok((rqueue.read(), wqueue.write()?))
190}