rapace_cell/lib.rs
1//! High-level cell runtime for rapace
2//!
3//! This crate provides boilerplate-free APIs for building rapace cells that communicate
4//! via SHM transport. It handles all the common setup that every cell needs:
5//!
6//! - CLI argument parsing (--shm-path or positional args)
7//! - Waiting for the host to create the SHM file
8//! - SHM session setup with standard configuration
9//! - RPC session creation with correct channel ID conventions
10//! - Service dispatcher setup
11//!
12//! # Single-service cells
13//!
14//! For simple cells that expose a single service:
15//!
16//! ```rust,ignore
17//! use rapace_cell::{run, ServiceDispatch};
18//! use rapace::{Frame, RpcError};
19//! use std::future::Future;
20//! use std::pin::Pin;
21//!
22//! # struct MyServiceServer;
23//! # impl MyServiceServer {
24//! # fn new(impl_: ()) -> Self { Self }
25//! # async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
26//! # unimplemented!()
27//! # }
28//! # }
29//! # impl ServiceDispatch for MyServiceServer {
30//! # fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
31//! # Box::pin(Self::dispatch_impl(self, method_id, payload))
32//! # }
33//! # }
34//!
35//! #[tokio::main]
36//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
37//! let server = MyServiceServer::new(());
38//! run(server).await?;
39//! Ok(())
40//! }
41//! ```
42//!
43//! # Multi-service cells
44//!
45//! For cells that expose multiple services:
46//!
47//! ```rust,ignore
48//! use rapace_cell::{run_multi, DispatcherBuilder, ServiceDispatch};
49//! use rapace::{Frame, RpcError};
50//! use std::future::Future;
51//! use std::pin::Pin;
52//!
53//! # struct MyServiceServer;
54//! # struct AnotherServiceServer;
55//! # impl MyServiceServer {
56//! # fn new(impl_: ()) -> Self { Self }
57//! # async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
58//! # unimplemented!()
59//! # }
60//! # }
61//! # impl AnotherServiceServer {
62//! # fn new(impl_: ()) -> Self { Self }
63//! # async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
64//! # unimplemented!()
65//! # }
66//! # }
67//! # impl ServiceDispatch for MyServiceServer {
68//! # fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
69//! # Box::pin(Self::dispatch_impl(self, method_id, payload))
70//! # }
71//! # }
72//! # impl ServiceDispatch for AnotherServiceServer {
73//! # fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
74//! # Box::pin(Self::dispatch_impl(self, method_id, payload))
75//! # }
76//! # }
77//!
78//! #[tokio::main]
79//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
80//! run_multi(|builder| {
81//! builder
82//! .add_service(MyServiceServer::new(()))
83//! .add_service(AnotherServiceServer::new(()))
84//! }).await?;
85//! Ok(())
86//! }
87//! ```
88//!
89//! # Configuration
90//!
91//! By default, the cell uses a standard SHM configuration that should match most hosts:
92//! - ring_capacity: 256 descriptors
93//! - slot_size: 64KB
94//! - slot_count: 128 slots (8MB total)
95//!
96//! The cell always uses even channel IDs starting from 2 (following rapace convention
97//! where cells use even IDs and hosts use odd IDs).
98
99use std::error::Error as StdError;
100use std::future::Future;
101use std::path::PathBuf;
102use std::pin::Pin;
103use std::sync::Arc;
104
105use rapace::transport::shm::{ShmSession, ShmSessionConfig, ShmTransport};
106use rapace::{Frame, RpcError, RpcSession, TransportError};
107
108/// Standard SHM configuration that should match most hosts
109pub const DEFAULT_SHM_CONFIG: ShmSessionConfig = ShmSessionConfig {
110 ring_capacity: 256, // 256 descriptors in flight
111 slot_size: 65536, // 64KB per slot
112 slot_count: 128, // 128 slots = 8MB total
113};
114
115/// Channel ID start for cells (even IDs: 2, 4, 6, ...)
116/// Hosts use odd IDs (1, 3, 5, ...)
117const CELL_CHANNEL_START: u32 = 2;
118
119/// Error type for cell runtime operations
120#[derive(Debug)]
121pub enum CellError {
122 /// Failed to parse command line arguments
123 Args(String),
124 /// SHM file was not created by host within timeout
125 ShmTimeout(PathBuf),
126 /// Failed to open SHM session
127 ShmOpen(String),
128 /// RPC session error
129 Rpc(RpcError),
130 /// Transport error
131 Transport(TransportError),
132}
133
134impl std::fmt::Display for CellError {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 match self {
137 Self::Args(msg) => write!(f, "Argument error: {}", msg),
138 Self::ShmTimeout(path) => write!(f, "SHM file not created by host: {}", path.display()),
139 Self::ShmOpen(msg) => write!(f, "Failed to open SHM: {}", msg),
140 Self::Rpc(e) => write!(f, "RPC error: {:?}", e),
141 Self::Transport(e) => write!(f, "Transport error: {:?}", e),
142 }
143 }
144}
145
146impl StdError for CellError {}
147
148impl From<RpcError> for CellError {
149 fn from(e: RpcError) -> Self {
150 Self::Rpc(e)
151 }
152}
153
154impl From<TransportError> for CellError {
155 fn from(e: TransportError) -> Self {
156 Self::Transport(e)
157 }
158}
159
160/// Trait for service servers that can be dispatched
161pub trait ServiceDispatch: Send + Sync + 'static {
162 /// Dispatch a method call to this service
163 fn dispatch(
164 &self,
165 method_id: u32,
166 payload: &[u8],
167 ) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>>;
168}
169
170/// Builder for creating multi-service dispatchers
171pub struct DispatcherBuilder {
172 services: Vec<Box<dyn ServiceDispatch>>,
173}
174
175impl DispatcherBuilder {
176 /// Create a new dispatcher builder
177 pub fn new() -> Self {
178 Self {
179 services: Vec::new(),
180 }
181 }
182
183 /// Add a service to the dispatcher
184 pub fn add_service<S>(mut self, service: S) -> Self
185 where
186 S: ServiceDispatch,
187 {
188 self.services.push(Box::new(service));
189 self
190 }
191
192 /// Build the dispatcher function
193 #[allow(clippy::type_complexity)]
194 pub fn build(
195 self,
196 ) -> impl Fn(u32, u32, Vec<u8>) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send>>
197 + Send
198 + Sync
199 + 'static {
200 let services = Arc::new(self.services);
201 move |_channel_id, method_id, payload| {
202 let services = services.clone();
203 Box::pin(async move {
204 // Try each service in order until one doesn't return Unimplemented
205 for service in services.iter() {
206 let result = service.dispatch(method_id, &payload).await;
207
208 // If not "unknown method_id", return the result
209 if !matches!(
210 &result,
211 Err(RpcError::Status {
212 code: rapace::ErrorCode::Unimplemented,
213 ..
214 })
215 ) {
216 return result;
217 }
218 }
219
220 // No service handled this method
221 Err(RpcError::Status {
222 code: rapace::ErrorCode::Unimplemented,
223 message: format!("Unknown method_id: {}", method_id),
224 })
225 })
226 }
227 }
228}
229
230impl Default for DispatcherBuilder {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236/// Parse CLI arguments to extract SHM path
237fn parse_args() -> Result<PathBuf, CellError> {
238 for arg in std::env::args().skip(1) {
239 if let Some(value) = arg.strip_prefix("--shm-path=") {
240 return Ok(PathBuf::from(value));
241 } else if !arg.starts_with("--") {
242 // First positional argument
243 return Ok(PathBuf::from(arg));
244 }
245 }
246 Err(CellError::Args(
247 "Missing SHM path (use --shm-path=PATH or provide as first argument)".to_string(),
248 ))
249}
250
251/// Wait for the host to create the SHM file
252async fn wait_for_shm(path: &std::path::Path, timeout_ms: u64) -> Result<(), CellError> {
253 let attempts = timeout_ms / 100;
254 for i in 0..attempts {
255 if path.exists() {
256 return Ok(());
257 }
258 if i < attempts - 1 {
259 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
260 }
261 }
262 Err(CellError::ShmTimeout(path.to_path_buf()))
263}
264
265/// Setup common cell infrastructure
266async fn setup_cell(
267 config: ShmSessionConfig,
268) -> Result<(Arc<RpcSession<ShmTransport>>, PathBuf), CellError> {
269 // Parse CLI args
270 let shm_path = parse_args()?;
271
272 // Wait for host to create SHM file (5 second timeout)
273 wait_for_shm(&shm_path, 5000).await?;
274
275 // Open the SHM session
276 let shm_session = ShmSession::open_file(&shm_path, config)
277 .map_err(|e| CellError::ShmOpen(format!("{:?}", e)))?;
278
279 // Create SHM transport
280 let transport = Arc::new(ShmTransport::new(shm_session));
281
282 // Create RPC session with cell channel start (even IDs)
283 let session = Arc::new(RpcSession::with_channel_start(
284 transport,
285 CELL_CHANNEL_START,
286 ));
287
288 Ok((session, shm_path))
289}
290
291/// Run a single-service cell
292///
293/// This function handles all the boilerplate for a simple cell:
294/// - Parses CLI arguments
295/// - Waits for SHM file creation
296/// - Sets up SHM transport and RPC session
297/// - Configures the service dispatcher
298/// - Runs the session loop
299///
300/// # Example
301///
302/// ```rust,ignore
303/// use rapace_cell::{run, ServiceDispatch};
304/// use rapace::{Frame, RpcError};
305/// use std::future::Future;
306/// use std::pin::Pin;
307///
308/// # struct MyServiceServer;
309/// # impl MyServiceServer {
310/// # fn new(impl_: ()) -> Self { Self }
311/// # async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
312/// # unimplemented!()
313/// # }
314/// # }
315/// # impl ServiceDispatch for MyServiceServer {
316/// # fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
317/// # Box::pin(Self::dispatch_impl(self, method_id, payload))
318/// # }
319/// # }
320///
321/// #[tokio::main]
322/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
323/// let server = MyServiceServer::new(());
324/// run(server).await?;
325/// Ok(())
326/// }
327/// ```
328pub async fn run<S>(service: S) -> Result<(), CellError>
329where
330 S: ServiceDispatch,
331{
332 run_with_config(service, DEFAULT_SHM_CONFIG).await
333}
334
335/// Run a single-service cell with custom SHM configuration
336pub async fn run_with_config<S>(service: S, config: ShmSessionConfig) -> Result<(), CellError>
337where
338 S: ServiceDispatch,
339{
340 let (session, shm_path) = setup_cell(config).await?;
341
342 tracing::info!("Connected to host via SHM: {}", shm_path.display());
343
344 // Set up single-service dispatcher
345 let dispatcher = {
346 let service = Arc::new(service);
347 move |_channel_id: u32, method_id: u32, payload: Vec<u8>| {
348 let service = service.clone();
349 Box::pin(async move { service.dispatch(method_id, &payload).await })
350 as Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send>>
351 }
352 };
353
354 session.set_dispatcher(dispatcher);
355
356 // Run the session loop
357 session.run().await?;
358
359 Ok(())
360}
361
362/// Run a multi-service cell
363///
364/// This function handles all the boilerplate for a multi-service cell.
365/// The builder function receives a `DispatcherBuilder` to configure which
366/// services the cell exposes.
367///
368/// # Example
369///
370/// ```rust,ignore
371/// use rapace_cell::{run_multi, DispatcherBuilder, ServiceDispatch};
372/// use rapace::{Frame, RpcError};
373/// use std::future::Future;
374/// use std::pin::Pin;
375///
376/// # struct MyServiceServer;
377/// # struct AnotherServiceServer;
378/// # impl MyServiceServer {
379/// # fn new(impl_: ()) -> Self { Self }
380/// # async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
381/// # unimplemented!()
382/// # }
383/// # }
384/// # impl AnotherServiceServer {
385/// # fn new(impl_: ()) -> Self { Self }
386/// # async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
387/// # unimplemented!()
388/// # }
389/// # }
390/// # impl ServiceDispatch for MyServiceServer {
391/// # fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
392/// # Box::pin(Self::dispatch_impl(self, method_id, payload))
393/// # }
394/// # }
395/// # impl ServiceDispatch for AnotherServiceServer {
396/// # fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
397/// # Box::pin(Self::dispatch_impl(self, method_id, payload))
398/// # }
399/// # }
400///
401/// #[tokio::main]
402/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
403/// run_multi(|builder| {
404/// builder
405/// .add_service(MyServiceServer::new(()))
406/// .add_service(AnotherServiceServer::new(()))
407/// }).await?;
408/// Ok(())
409/// }
410/// ```
411pub async fn run_multi<F>(builder_fn: F) -> Result<(), CellError>
412where
413 F: FnOnce(DispatcherBuilder) -> DispatcherBuilder,
414{
415 run_multi_with_config(builder_fn, DEFAULT_SHM_CONFIG).await
416}
417
418/// Run a multi-service cell with custom SHM configuration
419pub async fn run_multi_with_config<F>(
420 builder_fn: F,
421 config: ShmSessionConfig,
422) -> Result<(), CellError>
423where
424 F: FnOnce(DispatcherBuilder) -> DispatcherBuilder,
425{
426 let (session, shm_path) = setup_cell(config).await?;
427
428 tracing::info!("Connected to host via SHM: {}", shm_path.display());
429
430 // Build the dispatcher
431 let builder = DispatcherBuilder::new();
432 let builder = builder_fn(builder);
433 let dispatcher = builder.build();
434
435 session.set_dispatcher(dispatcher);
436
437 // Run the session loop
438 session.run().await?;
439
440 Ok(())
441}
442
443/// Extension trait for RpcSession to support single-service setup
444pub trait RpcSessionExt<T> {
445 /// Set a single service as the dispatcher for this session
446 ///
447 /// This is a convenience method for cells that only expose one service.
448 /// For multi-service cells, use `set_dispatcher` with a `DispatcherBuilder`.
449 fn set_service<S>(&self, service: S)
450 where
451 S: ServiceDispatch;
452}
453
454impl<T> RpcSessionExt<T> for RpcSession<T>
455where
456 T: rapace::Transport + 'static,
457{
458 fn set_service<S>(&self, service: S)
459 where
460 S: ServiceDispatch,
461 {
462 let service = Arc::new(service);
463 let dispatcher = move |_channel_id: u32, method_id: u32, payload: Vec<u8>| {
464 let service = service.clone();
465 Box::pin(async move { service.dispatch(method_id, &payload).await })
466 as Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send>>
467 };
468 self.set_dispatcher(dispatcher);
469 }
470}