Skip to main content

rust2go_mem_ffi/
lib.rs

1// Copyright 2024 ihciah. All Rights Reserved.
2
3mod future;
4mod utils;
5
6use std::io;
7
8pub use future::*;
9pub use mem_ring::{Queue, QueueMeta, ReadQueue, WriteQueue};
10pub use slab::Slab;
11pub use utils::*;
12
13pub type TaskHandler = fn(usize, TaskDesc) -> bool;
14
15#[repr(C)]
16#[derive(Debug, Clone, Copy)]
17pub struct Payload {
18    // Request parameters or Response parameters ptr.
19    // For multiple parameters, these parameters should be put contiguously in memory.
20    pub ptr: usize,
21    // For response, user_data should be equal to request's user_data.
22    // For drop ack, user_data should be equal to response's next_user_data.
23    pub user_data: usize,
24    // Use for combined response and drop ack.
25    pub next_user_data: usize,
26    // Each call with different signature should have a unique call_id.
27    pub call_id: u32,
28    // last bit: 1=contain request
29    // last second bit: 1=contain response
30    // last third bit: 1=want peer reply
31    // last 4th bit: 1=can drop last payload
32    // so:
33    // 1. 0b0101=call
34    // 2. 0b1110=response to normal call
35    // 3. 0b1000=only drop(for response)
36    // For a oneway call: send 1, recv 3
37    // For a normal call: send 1, recv 2, send 3
38    // last 5th bit: want peer quit
39    // so:
40    // 1. 0b10100=notify peer to quit and wait peer quit reply
41    // 2. 0b10000=notify peer to quit
42    // For a quit call: send 1, recv 2
43    pub flag: u32,
44}
45
46impl Payload {
47    const CALL: u32 = 0b0101;
48    const REPLY: u32 = 0b1110;
49    const DROP: u32 = 0b1000;
50    const QUIT_INIT: u32 = 0b10100;
51    const QUIT_ACK: u32 = 0b10000;
52
53    #[inline]
54    pub const fn new_call(call_id: u32, user_data: usize, ptr: usize) -> Self {
55        Self {
56            ptr,
57            user_data,
58            next_user_data: 0,
59            call_id,
60            flag: Self::CALL,
61        }
62    }
63
64    #[inline]
65    pub fn new_reply(call_id: u32, user_data: usize, next_user_data: usize, ptr: usize) -> Self {
66        Self {
67            ptr,
68            user_data,
69            next_user_data,
70            call_id,
71            flag: Self::REPLY,
72        }
73    }
74
75    #[inline]
76    pub fn new_drop(call_id: u32, user_data: usize) -> Self {
77        Self {
78            ptr: 0,
79            user_data,
80            next_user_data: 0,
81            call_id,
82            flag: Self::DROP,
83        }
84    }
85
86    #[inline]
87    pub const fn new_quit_init() -> Self {
88        Self {
89            ptr: 0,
90            user_data: 0,
91            next_user_data: 0,
92            call_id: 0,
93            flag: Self::QUIT_INIT,
94        }
95    }
96
97    #[inline]
98    pub const fn new_quit_ack() -> Self {
99        Self {
100            ptr: 0,
101            user_data: 0,
102            next_user_data: 0,
103            call_id: 0,
104            flag: Self::QUIT_ACK,
105        }
106    }
107}
108
109pub struct TaskDesc {
110    pub buf: Vec<u8>,
111    pub params_ptr: usize,
112    pub slot_ptr: usize,
113}
114
115/// # Safety
116/// peer_init_function_pointer must be a valid function.
117// Must be called for each thread.
118pub unsafe fn init_mem_ffi<const N: usize>(
119    peer_init_function_pointer: *const (),
120    size: usize,
121    handlers: [TaskHandler; N],
122) -> (WriteQueue<Payload>, SharedSlab) {
123    let (read_queue, write_queue) =
124        init_rings(peer_init_function_pointer, size).expect("unable to init ring");
125
126    #[cfg(all(feature = "tokio", not(feature = "monoio")))]
127    let shared_slab = std::sync::Arc::new(std::sync::Mutex::new(Slab::new()));
128    #[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
129    let shared_slab = std::rc::Rc::new(std::cell::UnsafeCell::new(Slab::new()));
130
131    let wq = write_queue.clone();
132    let sb = shared_slab.clone();
133    let guard = read_queue
134        .run_handler(move |payload: Payload| {
135            if payload.flag & Payload::QUIT_ACK == Payload::QUIT_ACK {
136                return;
137            }
138            let Some(call_handle) = handlers.get(payload.call_id as usize) else {
139                panic!("call handler {} not found", payload.call_id);
140            };
141            let sid = payload.user_data;
142            let desc = {
143                #[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
144                let locked = unsafe { &mut *sb.get() };
145                #[cfg(all(feature = "tokio", not(feature = "monoio")))]
146                let mut locked = sb.lock().unwrap();
147                locked.remove(sid)
148            };
149
150            if call_handle(payload.ptr, desc) {
151                let drop_payload = Payload::new_drop(payload.call_id, payload.next_user_data);
152                wq.push(drop_payload);
153            }
154        })
155        .expect("unable to run ffi handler");
156    Box::leak(Box::new(guard));
157    (write_queue, shared_slab)
158}
159
160/// # Safety
161/// peer_init_function_pointer must be a valid function.
162#[cfg(not(all(feature = "tokio", not(feature = "monoio"))))]
163pub unsafe fn init_rings<T: 'static>(
164    peer_init_function_pointer: *const (),
165    size: usize,
166) -> Result<(ReadQueue<T>, WriteQueue<T>), io::Error> {
167    type RingInitFunc = unsafe extern "C" fn(QueueMeta, QueueMeta);
168    let (rqueue, rmeta) = Queue::new(size)?;
169    let (wqueue, wmeta) = Queue::new(size)?;
170
171    let init_func: RingInitFunc = std::mem::transmute(peer_init_function_pointer);
172    init_func(rmeta, wmeta);
173
174    Ok((rqueue.read(), wqueue.write()?))
175}
176
177/// # Safety
178/// peer_init_function_pointer must be a valid function.
179#[cfg(all(feature = "tokio", not(feature = "monoio")))]
180pub unsafe fn init_rings<T: 'static + Send>(
181    peer_init_function_pointer: *const (),
182    size: usize,
183) -> Result<(ReadQueue<T>, WriteQueue<T>), io::Error> {
184    type RingInitFunc = unsafe extern "C" fn(QueueMeta, QueueMeta);
185    let (rqueue, rmeta) = Queue::new(size)?;
186    let (wqueue, wmeta) = Queue::new(size)?;
187
188    let init_func: RingInitFunc = std::mem::transmute(peer_init_function_pointer);
189    init_func(rmeta, wmeta);
190
191    Ok((rqueue.read(), wqueue.write()?))
192}