rapace_cell/
lib.rs

1//! High-level cell runtime for rapace
2//!
3//! This crate provides boilerplate-free APIs for building rapace cells that communicate
4//! via SHM transport. It handles all the common setup that every cell needs:
5//!
6//! - CLI argument parsing (--shm-path or positional args)
7//! - Waiting for the host to create the SHM file
8//! - SHM session setup with standard configuration
9//! - RPC session creation with correct channel ID conventions
10//! - Service dispatcher setup
11//!
12//! # Single-service cells
13//!
14//! For simple cells that expose a single service:
15//!
16//! ```rust,ignore
17//! use rapace_cell::{run, ServiceDispatch};
18//! use rapace::{Frame, RpcError};
19//! use std::future::Future;
20//! use std::pin::Pin;
21//!
22//! # struct MyServiceServer;
23//! # impl MyServiceServer {
24//! #     fn new(impl_: ()) -> Self { Self }
25//! #     async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
26//! #         unimplemented!()
27//! #     }
28//! # }
29//! # impl ServiceDispatch for MyServiceServer {
30//! #     fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
31//! #         Box::pin(Self::dispatch_impl(self, method_id, payload))
32//! #     }
33//! # }
34//!
35//! #[tokio::main]
36//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
37//!     let server = MyServiceServer::new(());
38//!     run(server).await?;
39//!     Ok(())
40//! }
41//! ```
42//!
43//! # Multi-service cells
44//!
45//! For cells that expose multiple services:
46//!
47//! ```rust,ignore
48//! use rapace_cell::{run_multi, DispatcherBuilder, ServiceDispatch};
49//! use rapace::{Frame, RpcError};
50//! use std::future::Future;
51//! use std::pin::Pin;
52//!
53//! # struct MyServiceServer;
54//! # struct AnotherServiceServer;
55//! # impl MyServiceServer {
56//! #     fn new(impl_: ()) -> Self { Self }
57//! #     async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
58//! #         unimplemented!()
59//! #     }
60//! # }
61//! # impl AnotherServiceServer {
62//! #     fn new(impl_: ()) -> Self { Self }
63//! #     async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
64//! #         unimplemented!()
65//! #     }
66//! # }
67//! # impl ServiceDispatch for MyServiceServer {
68//! #     fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
69//! #         Box::pin(Self::dispatch_impl(self, method_id, payload))
70//! #     }
71//! # }
72//! # impl ServiceDispatch for AnotherServiceServer {
73//! #     fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
74//! #         Box::pin(Self::dispatch_impl(self, method_id, payload))
75//! #     }
76//! # }
77//!
78//! #[tokio::main]
79//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
80//!     run_multi(|builder| {
81//!         builder
82//!             .add_service(MyServiceServer::new(()))
83//!             .add_service(AnotherServiceServer::new(()))
84//!     }).await?;
85//!     Ok(())
86//! }
87//! ```
88//!
89//! # Configuration
90//!
91//! By default, the cell uses a standard SHM configuration that should match most hosts:
92//! - ring_capacity: 256 descriptors
93//! - slot_size: 64KB
94//! - slot_count: 128 slots (8MB total)
95//!
96//! The cell always uses even channel IDs starting from 2 (following rapace convention
97//! where cells use even IDs and hosts use odd IDs).
98
99use std::error::Error as StdError;
100use std::future::Future;
101use std::path::PathBuf;
102use std::pin::Pin;
103use std::sync::Arc;
104
105use rapace::transport::shm::{ShmSession, ShmSessionConfig, ShmTransport};
106use rapace::{Frame, RpcError, RpcSession, TransportError};
107
108/// Standard SHM configuration that should match most hosts
109pub const DEFAULT_SHM_CONFIG: ShmSessionConfig = ShmSessionConfig {
110    ring_capacity: 256, // 256 descriptors in flight
111    slot_size: 65536,   // 64KB per slot
112    slot_count: 128,    // 128 slots = 8MB total
113};
114
115/// Channel ID start for cells (even IDs: 2, 4, 6, ...)
116/// Hosts use odd IDs (1, 3, 5, ...)
117const CELL_CHANNEL_START: u32 = 2;
118
119/// Error type for cell runtime operations
120#[derive(Debug)]
121pub enum CellError {
122    /// Failed to parse command line arguments
123    Args(String),
124    /// SHM file was not created by host within timeout
125    ShmTimeout(PathBuf),
126    /// Failed to open SHM session
127    ShmOpen(String),
128    /// RPC session error
129    Rpc(RpcError),
130    /// Transport error
131    Transport(TransportError),
132}
133
134impl std::fmt::Display for CellError {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        match self {
137            Self::Args(msg) => write!(f, "Argument error: {}", msg),
138            Self::ShmTimeout(path) => write!(f, "SHM file not created by host: {}", path.display()),
139            Self::ShmOpen(msg) => write!(f, "Failed to open SHM: {}", msg),
140            Self::Rpc(e) => write!(f, "RPC error: {:?}", e),
141            Self::Transport(e) => write!(f, "Transport error: {:?}", e),
142        }
143    }
144}
145
146impl StdError for CellError {}
147
148impl From<RpcError> for CellError {
149    fn from(e: RpcError) -> Self {
150        Self::Rpc(e)
151    }
152}
153
154impl From<TransportError> for CellError {
155    fn from(e: TransportError) -> Self {
156        Self::Transport(e)
157    }
158}
159
160/// Trait for service servers that can be dispatched
161pub trait ServiceDispatch: Send + Sync + 'static {
162    /// Dispatch a method call to this service
163    fn dispatch(
164        &self,
165        method_id: u32,
166        payload: &[u8],
167    ) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>>;
168}
169
170/// Builder for creating multi-service dispatchers
171pub struct DispatcherBuilder {
172    services: Vec<Box<dyn ServiceDispatch>>,
173}
174
175impl DispatcherBuilder {
176    /// Create a new dispatcher builder
177    pub fn new() -> Self {
178        Self {
179            services: Vec::new(),
180        }
181    }
182
183    /// Add a service to the dispatcher
184    pub fn add_service<S>(mut self, service: S) -> Self
185    where
186        S: ServiceDispatch,
187    {
188        self.services.push(Box::new(service));
189        self
190    }
191
192    /// Build the dispatcher function
193    #[allow(clippy::type_complexity)]
194    pub fn build(
195        self,
196    ) -> impl Fn(u32, u32, Vec<u8>) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send>>
197    + Send
198    + Sync
199    + 'static {
200        let services = Arc::new(self.services);
201        move |_channel_id, method_id, payload| {
202            let services = services.clone();
203            Box::pin(async move {
204                // Try each service in order until one doesn't return Unimplemented
205                for service in services.iter() {
206                    let result = service.dispatch(method_id, &payload).await;
207
208                    // If not "unknown method_id", return the result
209                    if !matches!(
210                        &result,
211                        Err(RpcError::Status {
212                            code: rapace::ErrorCode::Unimplemented,
213                            ..
214                        })
215                    ) {
216                        return result;
217                    }
218                }
219
220                // No service handled this method
221                Err(RpcError::Status {
222                    code: rapace::ErrorCode::Unimplemented,
223                    message: format!("Unknown method_id: {}", method_id),
224                })
225            })
226        }
227    }
228}
229
230impl Default for DispatcherBuilder {
231    fn default() -> Self {
232        Self::new()
233    }
234}
235
236/// Parse CLI arguments to extract SHM path
237fn parse_args() -> Result<PathBuf, CellError> {
238    for arg in std::env::args().skip(1) {
239        if let Some(value) = arg.strip_prefix("--shm-path=") {
240            return Ok(PathBuf::from(value));
241        } else if !arg.starts_with("--") {
242            // First positional argument
243            return Ok(PathBuf::from(arg));
244        }
245    }
246    Err(CellError::Args(
247        "Missing SHM path (use --shm-path=PATH or provide as first argument)".to_string(),
248    ))
249}
250
251/// Wait for the host to create the SHM file
252async fn wait_for_shm(path: &std::path::Path, timeout_ms: u64) -> Result<(), CellError> {
253    let attempts = timeout_ms / 100;
254    for i in 0..attempts {
255        if path.exists() {
256            return Ok(());
257        }
258        if i < attempts - 1 {
259            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
260        }
261    }
262    Err(CellError::ShmTimeout(path.to_path_buf()))
263}
264
265/// Setup common cell infrastructure
266async fn setup_cell(
267    config: ShmSessionConfig,
268) -> Result<(Arc<RpcSession<ShmTransport>>, PathBuf), CellError> {
269    // Parse CLI args
270    let shm_path = parse_args()?;
271
272    // Wait for host to create SHM file (5 second timeout)
273    wait_for_shm(&shm_path, 5000).await?;
274
275    // Open the SHM session
276    let shm_session = ShmSession::open_file(&shm_path, config)
277        .map_err(|e| CellError::ShmOpen(format!("{:?}", e)))?;
278
279    // Create SHM transport
280    let transport = Arc::new(ShmTransport::new(shm_session));
281
282    // Create RPC session with cell channel start (even IDs)
283    let session = Arc::new(RpcSession::with_channel_start(
284        transport,
285        CELL_CHANNEL_START,
286    ));
287
288    Ok((session, shm_path))
289}
290
291/// Run a single-service cell
292///
293/// This function handles all the boilerplate for a simple cell:
294/// - Parses CLI arguments
295/// - Waits for SHM file creation
296/// - Sets up SHM transport and RPC session
297/// - Configures the service dispatcher
298/// - Runs the session loop
299///
300/// # Example
301///
302/// ```rust,ignore
303/// use rapace_cell::{run, ServiceDispatch};
304/// use rapace::{Frame, RpcError};
305/// use std::future::Future;
306/// use std::pin::Pin;
307///
308/// # struct MyServiceServer;
309/// # impl MyServiceServer {
310/// #     fn new(impl_: ()) -> Self { Self }
311/// #     async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
312/// #         unimplemented!()
313/// #     }
314/// # }
315/// # impl ServiceDispatch for MyServiceServer {
316/// #     fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
317/// #         Box::pin(Self::dispatch_impl(self, method_id, payload))
318/// #     }
319/// # }
320///
321/// #[tokio::main]
322/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
323///     let server = MyServiceServer::new(());
324///     run(server).await?;
325///     Ok(())
326/// }
327/// ```
328pub async fn run<S>(service: S) -> Result<(), CellError>
329where
330    S: ServiceDispatch,
331{
332    run_with_config(service, DEFAULT_SHM_CONFIG).await
333}
334
335/// Run a single-service cell with custom SHM configuration
336pub async fn run_with_config<S>(service: S, config: ShmSessionConfig) -> Result<(), CellError>
337where
338    S: ServiceDispatch,
339{
340    let (session, shm_path) = setup_cell(config).await?;
341
342    tracing::info!("Connected to host via SHM: {}", shm_path.display());
343
344    // Set up single-service dispatcher
345    let dispatcher = {
346        let service = Arc::new(service);
347        move |_channel_id: u32, method_id: u32, payload: Vec<u8>| {
348            let service = service.clone();
349            Box::pin(async move { service.dispatch(method_id, &payload).await })
350                as Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send>>
351        }
352    };
353
354    session.set_dispatcher(dispatcher);
355
356    // Run the session loop
357    session.run().await?;
358
359    Ok(())
360}
361
362/// Run a multi-service cell
363///
364/// This function handles all the boilerplate for a multi-service cell.
365/// The builder function receives a `DispatcherBuilder` to configure which
366/// services the cell exposes.
367///
368/// # Example
369///
370/// ```rust,ignore
371/// use rapace_cell::{run_multi, DispatcherBuilder, ServiceDispatch};
372/// use rapace::{Frame, RpcError};
373/// use std::future::Future;
374/// use std::pin::Pin;
375///
376/// # struct MyServiceServer;
377/// # struct AnotherServiceServer;
378/// # impl MyServiceServer {
379/// #     fn new(impl_: ()) -> Self { Self }
380/// #     async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
381/// #         unimplemented!()
382/// #     }
383/// # }
384/// # impl AnotherServiceServer {
385/// #     fn new(impl_: ()) -> Self { Self }
386/// #     async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
387/// #         unimplemented!()
388/// #     }
389/// # }
390/// # impl ServiceDispatch for MyServiceServer {
391/// #     fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
392/// #         Box::pin(Self::dispatch_impl(self, method_id, payload))
393/// #     }
394/// # }
395/// # impl ServiceDispatch for AnotherServiceServer {
396/// #     fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
397/// #         Box::pin(Self::dispatch_impl(self, method_id, payload))
398/// #     }
399/// # }
400///
401/// #[tokio::main]
402/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
403///     run_multi(|builder| {
404///         builder
405///             .add_service(MyServiceServer::new(()))
406///             .add_service(AnotherServiceServer::new(()))
407///     }).await?;
408///     Ok(())
409/// }
410/// ```
411pub async fn run_multi<F>(builder_fn: F) -> Result<(), CellError>
412where
413    F: FnOnce(DispatcherBuilder) -> DispatcherBuilder,
414{
415    run_multi_with_config(builder_fn, DEFAULT_SHM_CONFIG).await
416}
417
418/// Run a multi-service cell with custom SHM configuration
419pub async fn run_multi_with_config<F>(
420    builder_fn: F,
421    config: ShmSessionConfig,
422) -> Result<(), CellError>
423where
424    F: FnOnce(DispatcherBuilder) -> DispatcherBuilder,
425{
426    let (session, shm_path) = setup_cell(config).await?;
427
428    tracing::info!("Connected to host via SHM: {}", shm_path.display());
429
430    // Build the dispatcher
431    let builder = DispatcherBuilder::new();
432    let builder = builder_fn(builder);
433    let dispatcher = builder.build();
434
435    session.set_dispatcher(dispatcher);
436
437    // Run the session loop
438    session.run().await?;
439
440    Ok(())
441}
442
443/// Extension trait for RpcSession to support single-service setup
444pub trait RpcSessionExt<T> {
445    /// Set a single service as the dispatcher for this session
446    ///
447    /// This is a convenience method for cells that only expose one service.
448    /// For multi-service cells, use `set_dispatcher` with a `DispatcherBuilder`.
449    fn set_service<S>(&self, service: S)
450    where
451        S: ServiceDispatch;
452}
453
454impl<T> RpcSessionExt<T> for RpcSession<T>
455where
456    T: rapace::Transport + 'static,
457{
458    fn set_service<S>(&self, service: S)
459    where
460        S: ServiceDispatch,
461    {
462        let service = Arc::new(service);
463        let dispatcher = move |_channel_id: u32, method_id: u32, payload: Vec<u8>| {
464            let service = service.clone();
465            Box::pin(async move { service.dispatch(method_id, &payload).await })
466                as Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send>>
467        };
468        self.set_dispatcher(dispatcher);
469    }
470}