1mod future;
2mod utils;
3
4use std::io;
5
6pub use future::*;
7pub use mem_ring::{Queue, QueueMeta, ReadQueue, WriteQueue};
8pub use slab::Slab;
9pub use utils::*;
10
11pub type TaskHandler = fn(usize, TaskDesc) -> bool;
12
13#[repr(C)]
14#[derive(Debug, Clone, Copy)]
15pub struct Payload {
16 pub ptr: usize,
19 pub user_data: usize,
22 pub next_user_data: usize,
24 pub call_id: u32,
26 pub flag: u32,
42}
43
44impl Payload {
45 const CALL: u32 = 0b0101;
46 const REPLY: u32 = 0b1110;
47 const DROP: u32 = 0b1000;
48 const QUIT_INIT: u32 = 0b10100;
49 const QUIT_ACK: u32 = 0b10000;
50
51 #[inline]
52 pub const fn new_call(call_id: u32, user_data: usize, ptr: usize) -> Self {
53 Self {
54 ptr,
55 user_data,
56 next_user_data: 0,
57 call_id,
58 flag: Self::CALL,
59 }
60 }
61
62 #[inline]
63 pub fn new_reply(call_id: u32, user_data: usize, next_user_data: usize, ptr: usize) -> Self {
64 Self {
65 ptr,
66 user_data,
67 next_user_data,
68 call_id,
69 flag: Self::REPLY,
70 }
71 }
72
73 #[inline]
74 pub fn new_drop(call_id: u32, user_data: usize) -> Self {
75 Self {
76 ptr: 0,
77 user_data,
78 next_user_data: 0,
79 call_id,
80 flag: Self::DROP,
81 }
82 }
83
84 #[inline]
85 pub const fn new_quit_init() -> Self {
86 Self {
87 ptr: 0,
88 user_data: 0,
89 next_user_data: 0,
90 call_id: 0,
91 flag: Self::QUIT_INIT,
92 }
93 }
94
95 #[inline]
96 pub const fn new_quit_ack() -> Self {
97 Self {
98 ptr: 0,
99 user_data: 0,
100 next_user_data: 0,
101 call_id: 0,
102 flag: Self::QUIT_ACK,
103 }
104 }
105}
106
107pub struct TaskDesc {
108 pub buf: Vec<u8>,
109 pub params_ptr: usize,
110 pub slot_ptr: usize,
111}
112
113pub unsafe fn init_mem_ffi<const N: usize>(
117 peer_init_function_pointer: *const (),
118 size: usize,
119 handlers: [TaskHandler; N],
120) -> (WriteQueue<Payload>, SharedSlab) {
121 let (read_queue, write_queue) =
122 init_rings(peer_init_function_pointer, size).expect("unable to init ring");
123
124 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
125 let shared_slab = std::sync::Arc::new(std::sync::Mutex::new(Slab::new()));
126 #[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
127 let shared_slab = std::rc::Rc::new(std::cell::UnsafeCell::new(Slab::new()));
128
129 let wq = write_queue.clone();
130 let sb = shared_slab.clone();
131 let guard = read_queue
132 .run_handler(move |payload: Payload| {
133 if payload.flag & Payload::QUIT_ACK == Payload::QUIT_ACK {
134 return;
135 }
136 let Some(call_handle) = handlers.get(payload.call_id as usize) else {
137 panic!("call handler {} not found", payload.call_id);
138 };
139 let sid = payload.user_data;
140 let desc = {
141 #[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
142 let locked = unsafe { &mut *sb.get() };
143 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
144 let mut locked = sb.lock().unwrap();
145 locked.remove(sid)
146 };
147
148 if call_handle(payload.ptr, desc) {
149 let drop_payload = Payload::new_drop(payload.call_id, payload.next_user_data);
150 wq.push(drop_payload);
151 }
152 })
153 .expect("unable to run ffi handler");
154 Box::leak(Box::new(guard));
155 (write_queue, shared_slab)
156}
157
158#[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
161pub unsafe fn init_rings<T: 'static>(
162 peer_init_function_pointer: *const (),
163 size: usize,
164) -> Result<(ReadQueue<T>, WriteQueue<T>), io::Error> {
165 type RingInitFunc = unsafe extern "C" fn(QueueMeta, QueueMeta);
166 let (rqueue, rmeta) = Queue::new(size)?;
167 let (wqueue, wmeta) = Queue::new(size)?;
168
169 let init_func: RingInitFunc = std::mem::transmute(peer_init_function_pointer);
170 init_func(rmeta, wmeta);
171
172 Ok((rqueue.read(), wqueue.write()?))
173}
174
175#[cfg(all(feature = "tokio", not(feature = "monoio")))]
178pub unsafe fn init_rings<T: 'static + Send>(
179 peer_init_function_pointer: *const (),
180 size: usize,
181) -> Result<(ReadQueue<T>, WriteQueue<T>), io::Error> {
182 type RingInitFunc = unsafe extern "C" fn(QueueMeta, QueueMeta);
183 let (rqueue, rmeta) = Queue::new(size)?;
184 let (wqueue, wmeta) = Queue::new(size)?;
185
186 let init_func: RingInitFunc = std::mem::transmute(peer_init_function_pointer);
187 init_func(rmeta, wmeta);
188
189 Ok((rqueue.read(), wqueue.write()?))
190}