1mod future;
4mod utils;
5
6use std::io;
7
8pub use future::*;
9pub use mem_ring::{Queue, QueueMeta, ReadQueue, WriteQueue};
10pub use slab::Slab;
11pub use utils::*;
12
13pub type TaskHandler = fn(usize, TaskDesc) -> bool;
14
15#[repr(C)]
16#[derive(Debug, Clone, Copy)]
17pub struct Payload {
18 pub ptr: usize,
21 pub user_data: usize,
24 pub next_user_data: usize,
26 pub call_id: u32,
28 pub flag: u32,
44}
45
46impl Payload {
47 const CALL: u32 = 0b0101;
48 const REPLY: u32 = 0b1110;
49 const DROP: u32 = 0b1000;
50 const QUIT_INIT: u32 = 0b10100;
51 const QUIT_ACK: u32 = 0b10000;
52
53 #[inline]
54 pub const fn new_call(call_id: u32, user_data: usize, ptr: usize) -> Self {
55 Self {
56 ptr,
57 user_data,
58 next_user_data: 0,
59 call_id,
60 flag: Self::CALL,
61 }
62 }
63
64 #[inline]
65 pub fn new_reply(call_id: u32, user_data: usize, next_user_data: usize, ptr: usize) -> Self {
66 Self {
67 ptr,
68 user_data,
69 next_user_data,
70 call_id,
71 flag: Self::REPLY,
72 }
73 }
74
75 #[inline]
76 pub fn new_drop(call_id: u32, user_data: usize) -> Self {
77 Self {
78 ptr: 0,
79 user_data,
80 next_user_data: 0,
81 call_id,
82 flag: Self::DROP,
83 }
84 }
85
86 #[inline]
87 pub const fn new_quit_init() -> Self {
88 Self {
89 ptr: 0,
90 user_data: 0,
91 next_user_data: 0,
92 call_id: 0,
93 flag: Self::QUIT_INIT,
94 }
95 }
96
97 #[inline]
98 pub const fn new_quit_ack() -> Self {
99 Self {
100 ptr: 0,
101 user_data: 0,
102 next_user_data: 0,
103 call_id: 0,
104 flag: Self::QUIT_ACK,
105 }
106 }
107}
108
109pub struct TaskDesc {
110 pub buf: Vec<u8>,
111 pub params_ptr: usize,
112 pub slot_ptr: usize,
113}
114
115pub unsafe fn init_mem_ffi<const N: usize>(
119 peer_init_function_pointer: *const (),
120 size: usize,
121 handlers: [TaskHandler; N],
122) -> (WriteQueue<Payload>, SharedSlab) {
123 let (read_queue, write_queue) =
124 init_rings(peer_init_function_pointer, size).expect("unable to init ring");
125
126 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
127 let shared_slab = std::sync::Arc::new(std::sync::Mutex::new(Slab::new()));
128 #[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
129 let shared_slab = std::rc::Rc::new(std::cell::UnsafeCell::new(Slab::new()));
130
131 let wq = write_queue.clone();
132 let sb = shared_slab.clone();
133 let guard = read_queue
134 .run_handler(move |payload: Payload| {
135 if payload.flag & Payload::QUIT_ACK == Payload::QUIT_ACK {
136 return;
137 }
138 let Some(call_handle) = handlers.get(payload.call_id as usize) else {
139 panic!("call handler {} not found", payload.call_id);
140 };
141 let sid = payload.user_data;
142 let desc = {
143 #[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
144 let locked = unsafe { &mut *sb.get() };
145 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
146 let mut locked = sb.lock().unwrap();
147 locked.remove(sid)
148 };
149
150 if call_handle(payload.ptr, desc) {
151 let drop_payload = Payload::new_drop(payload.call_id, payload.next_user_data);
152 wq.push(drop_payload);
153 }
154 })
155 .expect("unable to run ffi handler");
156 Box::leak(Box::new(guard));
157 (write_queue, shared_slab)
158}
159
160#[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
163pub unsafe fn init_rings<T: 'static>(
164 peer_init_function_pointer: *const (),
165 size: usize,
166) -> Result<(ReadQueue<T>, WriteQueue<T>), io::Error> {
167 type RingInitFunc = unsafe extern "C" fn(QueueMeta, QueueMeta);
168 let (rqueue, rmeta) = Queue::new(size)?;
169 let (wqueue, wmeta) = Queue::new(size)?;
170
171 let init_func: RingInitFunc = std::mem::transmute(peer_init_function_pointer);
172 init_func(rmeta, wmeta);
173
174 Ok((rqueue.read(), wqueue.write()?))
175}
176
177#[cfg(all(feature = "tokio", not(feature = "monoio")))]
180pub unsafe fn init_rings<T: 'static + Send>(
181 peer_init_function_pointer: *const (),
182 size: usize,
183) -> Result<(ReadQueue<T>, WriteQueue<T>), io::Error> {
184 type RingInitFunc = unsafe extern "C" fn(QueueMeta, QueueMeta);
185 let (rqueue, rmeta) = Queue::new(size)?;
186 let (wqueue, wmeta) = Queue::new(size)?;
187
188 let init_func: RingInitFunc = std::mem::transmute(peer_init_function_pointer);
189 init_func(rmeta, wmeta);
190
191 Ok((rqueue.read(), wqueue.write()?))
192}