rapace_cell/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::error::Error as StdError;
4use std::future::Future;
5use std::path::PathBuf;
6use std::pin::Pin;
7use std::sync::Arc;
8
9use rapace::transport::shm::{ShmSession, ShmSessionConfig, ShmTransport};
10use rapace::{Transport, TransportError};
11
12// Re-export common rapace types so macro-expanded code can refer to `$crate::...`
13// without requiring every cell crate to depend on `rapace` directly.
14pub use rapace::{Frame, RpcError, RpcSession};
15
16pub mod lifecycle;
17pub use lifecycle::{CellLifecycle, CellLifecycleClient, CellLifecycleServer, ReadyAck, ReadyMsg};
18
19pub mod tracing_setup;
20pub use tracing_setup::TracingConfigService;
21
22#[cfg(unix)]
23use rapace::transport::shm::{Doorbell, HubPeer};
24#[cfg(unix)]
25use std::os::unix::io::RawFd;
26
27fn quiet_mode_enabled() -> bool {
28    fn env_truthy(key: &str) -> bool {
29        match std::env::var_os(key) {
30            None => false,
31            Some(v) => {
32                let s = v.to_string_lossy();
33                !(s.is_empty() || s == "0" || s.eq_ignore_ascii_case("false"))
34            }
35        }
36    }
37
38    // Support both generic and dodeca-specific toggles.
39    env_truthy("RAPACE_QUIET") || env_truthy("DODECA_QUIET")
40}
41
42/// Default SHM configuration for two-peer sessions.
43///
44/// Designed for typical host-cell communication with moderate payloads.
45/// Total memory per session: ~8.5MB (2 × 17KB rings + 8MB data segment).
46///
47/// See module documentation for customization guidelines.
48pub const DEFAULT_SHM_CONFIG: ShmSessionConfig = ShmSessionConfig {
49    ring_capacity: 256, // 256 in-flight descriptors per direction
50    slot_size: 65536,   // 64KB max payload per slot
51    slot_count: 128,    // 128 slots = 8MB total data segment
52};
53
54/// Channel ID start for cells (even IDs: 2, 4, 6, ...)
55/// Hosts use odd IDs (1, 3, 5, ...)
56const CELL_CHANNEL_START: u32 = 2;
57
58/// Error type for cell runtime operations
59#[derive(Debug)]
60pub enum CellError {
61    /// Failed to parse command line arguments
62    Args(String),
63    /// SHM file was not created by host within timeout
64    ShmTimeout(PathBuf),
65    /// Hub file was not created by host within timeout
66    HubTimeout(PathBuf),
67    /// Failed to open SHM session
68    ShmOpen(String),
69    /// Failed to open hub session
70    HubOpen(String),
71    /// Missing or invalid hub arguments
72    HubArgs(String),
73    /// Doorbell fd invalid
74    DoorbellFd(String),
75    /// RPC session error
76    Rpc(RpcError),
77    /// Transport error
78    Transport(TransportError),
79}
80
81impl std::fmt::Display for CellError {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        match self {
84            Self::Args(msg) => write!(f, "Argument error: {}", msg),
85            Self::ShmTimeout(path) => write!(f, "SHM file not created by host: {}", path.display()),
86            Self::HubTimeout(path) => write!(f, "Hub file not created by host: {}", path.display()),
87            Self::ShmOpen(msg) => write!(f, "Failed to open SHM: {}", msg),
88            Self::HubOpen(msg) => write!(f, "Failed to open hub: {}", msg),
89            Self::HubArgs(msg) => write!(f, "Hub argument error: {}", msg),
90            Self::DoorbellFd(msg) => write!(f, "Doorbell fd error: {}", msg),
91            Self::Rpc(e) => write!(f, "RPC error: {:?}", e),
92            Self::Transport(e) => write!(f, "Transport error: {:?}", e),
93        }
94    }
95}
96
97impl StdError for CellError {}
98
99impl From<RpcError> for CellError {
100    fn from(e: RpcError) -> Self {
101        Self::Rpc(e)
102    }
103}
104
105impl From<TransportError> for CellError {
106    fn from(e: TransportError) -> Self {
107        Self::Transport(e)
108    }
109}
110
111/// Trait for service servers that can be dispatched
112pub trait ServiceDispatch: Send + Sync + 'static {
113    /// Dispatch a method call to this service
114    fn dispatch(
115        &self,
116        method_id: u32,
117        payload: &[u8],
118    ) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>>;
119}
120
121/// Builder for creating multi-service dispatchers
122pub struct DispatcherBuilder {
123    services: Vec<Box<dyn ServiceDispatch>>,
124}
125
126impl DispatcherBuilder {
127    /// Create a new dispatcher builder
128    pub fn new() -> Self {
129        Self {
130            services: Vec::new(),
131        }
132    }
133
134    /// Add a service to the dispatcher
135    pub fn add_service<S>(mut self, service: S) -> Self
136    where
137        S: ServiceDispatch,
138    {
139        self.services.push(Box::new(service));
140        self
141    }
142
143    /// Add service introspection to this cell.
144    ///
145    /// This exposes the `ServiceIntrospection` service, allowing callers to
146    /// query what services and methods this cell provides at runtime.
147    ///
148    /// # Example
149    ///
150    /// ```ignore
151    /// use rapace_cell::run_multi;
152    ///
153    /// run_multi(|builder| {
154    ///     builder
155    ///         .add_service(MyServiceServer::new(my_impl))
156    ///         .with_introspection() // ← Add introspection!
157    /// }).await?;
158    /// ```
159    #[cfg(feature = "introspection")]
160    pub fn with_introspection(self) -> Self {
161        use rapace_introspection::{DefaultServiceIntrospection, ServiceIntrospectionServer};
162
163        let introspection = DefaultServiceIntrospection::new();
164        let server = Arc::new(ServiceIntrospectionServer::new(introspection));
165
166        // Wrap the generated server to implement ServiceDispatch
167        struct IntrospectionDispatcher(
168            Arc<ServiceIntrospectionServer<DefaultServiceIntrospection>>,
169        );
170
171        impl ServiceDispatch for IntrospectionDispatcher {
172            fn dispatch(
173                &self,
174                method_id: u32,
175                payload: &[u8],
176            ) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>>
177            {
178                // Clone payload and capture server Arc for the future
179                let payload_owned = payload.to_vec();
180                let server = self.0.clone();
181                Box::pin(async move { server.dispatch(method_id, &payload_owned).await })
182            }
183        }
184
185        self.add_service(IntrospectionDispatcher(server))
186    }
187
188    /// Build the dispatcher function
189    #[allow(clippy::type_complexity)]
190    pub fn build(
191        self,
192    ) -> impl Fn(Frame) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send>>
193    + Send
194    + Sync
195    + 'static {
196        let services = Arc::new(self.services);
197        move |request: Frame| {
198            let services = services.clone();
199            Box::pin(async move {
200                let method_id = request.desc.method_id;
201                let payload = request.payload_bytes();
202
203                // Try each service in order until one doesn't return Unimplemented
204                for service in services.iter() {
205                    let result = service.dispatch(method_id, payload).await;
206
207                    // If not "unknown method_id", return the result
208                    if !matches!(
209                        &result,
210                        Err(RpcError::Status {
211                            code: rapace::ErrorCode::Unimplemented,
212                            ..
213                        })
214                    ) {
215                        let mut response = result?;
216                        response.desc.channel_id = request.desc.channel_id;
217                        response.desc.msg_id = request.desc.msg_id;
218                        return Ok(response);
219                    }
220                }
221
222                // No service handled this method - use registry for better error message
223                let error_msg = rapace_registry::ServiceRegistry::with_global(|reg| {
224                    if let Some(method) = reg.method_by_id(rapace_registry::MethodId(method_id)) {
225                        format!(
226                            "Method '{}' (id={}) exists in registry but is not implemented by any service in this cell",
227                            method.full_name, method_id
228                        )
229                    } else {
230                        format!(
231                            "Unknown method_id: {} (not registered in global registry)",
232                            method_id
233                        )
234                    }
235                });
236
237                Err(RpcError::Status {
238                    code: rapace::ErrorCode::Unimplemented,
239                    message: error_msg,
240                })
241            })
242        }
243    }
244}
245
246impl Default for DispatcherBuilder {
247    fn default() -> Self {
248        Self::new()
249    }
250}
251
252enum ParsedArgs {
253    Pair {
254        shm_path: PathBuf,
255    },
256    #[cfg(unix)]
257    Hub {
258        hub_path: PathBuf,
259        peer_id: u16,
260        doorbell_fd: RawFd,
261    },
262}
263
264/// Parse CLI arguments to extract either SHM pair args or hub args.
265fn parse_args() -> Result<ParsedArgs, CellError> {
266    let mut shm_path: Option<PathBuf> = None;
267    let mut hub_path: Option<PathBuf> = None;
268    let mut peer_id: Option<u16> = None;
269    #[cfg(unix)]
270    let mut doorbell_fd: Option<RawFd> = None;
271
272    for arg in std::env::args().skip(1) {
273        if let Some(value) = arg.strip_prefix("--shm-path=") {
274            shm_path = Some(PathBuf::from(value));
275        } else if let Some(value) = arg.strip_prefix("--hub-path=") {
276            hub_path = Some(PathBuf::from(value));
277        } else if let Some(value) = arg.strip_prefix("--peer-id=") {
278            peer_id = value.parse::<u16>().ok();
279        } else if let Some(value) = arg.strip_prefix("--doorbell-fd=") {
280            #[cfg(unix)]
281            {
282                doorbell_fd = value.parse::<i32>().ok();
283            }
284        } else if !arg.starts_with("--") && shm_path.is_none() && hub_path.is_none() {
285            // First positional argument defaults to shm-path for backwards compat.
286            shm_path = Some(PathBuf::from(arg));
287        }
288    }
289
290    if let Some(hub_path) = hub_path {
291        #[cfg(not(unix))]
292        {
293            return Err(CellError::HubArgs(
294                "hub mode is only supported on unix platforms".to_string(),
295            ));
296        }
297
298        #[cfg(unix)]
299        {
300            let peer_id = peer_id
301                .ok_or_else(|| CellError::HubArgs("Missing --peer-id for hub mode".to_string()))?;
302            let doorbell_fd = doorbell_fd.ok_or_else(|| {
303                CellError::HubArgs("Missing --doorbell-fd for hub mode".to_string())
304            })?;
305            return Ok(ParsedArgs::Hub {
306                hub_path,
307                peer_id,
308                doorbell_fd,
309            });
310        }
311    }
312
313    if let Some(shm_path) = shm_path {
314        return Ok(ParsedArgs::Pair { shm_path });
315    }
316
317    Err(CellError::Args(
318        "Missing SHM path (use --shm-path=PATH or provide as first argument)".to_string(),
319    ))
320}
321
322/// Wait for the host to create the SHM file
323async fn wait_for_shm(path: &std::path::Path, timeout_ms: u64) -> Result<(), CellError> {
324    let attempts = timeout_ms / 100;
325    for i in 0..attempts {
326        if path.exists() {
327            return Ok(());
328        }
329        if i < attempts - 1 {
330            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
331        }
332    }
333    Err(CellError::ShmTimeout(path.to_path_buf()))
334}
335
336async fn wait_for_hub(path: &std::path::Path, timeout_ms: u64) -> Result<(), CellError> {
337    let attempts = timeout_ms / 100;
338    for i in 0..attempts {
339        if path.exists() {
340            return Ok(());
341        }
342        if i < attempts - 1 {
343            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
344        }
345    }
346    Err(CellError::HubTimeout(path.to_path_buf()))
347}
348
349#[cfg(unix)]
350fn validate_doorbell_fd(fd: RawFd) -> Result<(), CellError> {
351    let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
352    if flags < 0 {
353        return Err(CellError::DoorbellFd(format!(
354            "doorbell fd {fd} is invalid: {}",
355            std::io::Error::last_os_error()
356        )));
357    }
358    Ok(())
359}
360
361fn cell_name_guess() -> String {
362    std::env::current_exe()
363        .ok()
364        .and_then(|p| p.file_stem().map(|s| s.to_string_lossy().into_owned()))
365        .unwrap_or_else(|| "cell".to_string())
366}
367
368/// Cell setup result with optional peer_id for hub mode
369struct CellSetup {
370    session: Arc<RpcSession>,
371    #[allow(dead_code)]
372    path: PathBuf,
373    /// peer_id is Some when in hub mode, indicating ready signal should be sent
374    peer_id: Option<u16>,
375}
376
377/// Setup common cell infrastructure
378async fn setup_cell(config: ShmSessionConfig) -> Result<CellSetup, CellError> {
379    // Install death-watch: cell will exit if parent dies
380    // Required on macOS for ur-taking-me-with-you to work
381    ur_taking_me_with_you::die_with_parent();
382
383    match parse_args()? {
384        ParsedArgs::Pair { shm_path } => {
385            wait_for_shm(&shm_path, 5000).await?;
386
387            let shm_session = ShmSession::open_file(&shm_path, config)
388                .map_err(|e| CellError::ShmOpen(format!("{:?}", e)))?;
389
390            let transport = Transport::Shm(ShmTransport::new(shm_session));
391            let session = Arc::new(RpcSession::with_channel_start(
392                transport,
393                CELL_CHANNEL_START,
394            ));
395            Ok(CellSetup {
396                session,
397                path: shm_path,
398                peer_id: None,
399            })
400        }
401        #[cfg(unix)]
402        ParsedArgs::Hub {
403            hub_path,
404            peer_id,
405            doorbell_fd,
406        } => {
407            wait_for_hub(&hub_path, 5000).await?;
408            validate_doorbell_fd(doorbell_fd)?;
409
410            let peer = HubPeer::open(&hub_path, peer_id)
411                .map_err(|e| CellError::HubOpen(format!("{:?}", e)))?;
412            peer.register();
413
414            let doorbell = Doorbell::from_raw_fd(doorbell_fd)
415                .map_err(|e| CellError::DoorbellFd(format!("{:?}", e)))?;
416
417            let transport = Transport::Shm(ShmTransport::hub_peer(
418                Arc::new(peer),
419                doorbell,
420                cell_name_guess(),
421            ));
422
423            let session = Arc::new(RpcSession::with_channel_start(
424                transport,
425                CELL_CHANNEL_START,
426            ));
427            Ok(CellSetup {
428                session,
429                path: hub_path,
430                peer_id: Some(peer_id),
431            })
432        }
433    }
434}
435
436/// Run a single-service cell
437///
438/// This function handles all the boilerplate for a simple cell:
439/// - Parses CLI arguments
440/// - Waits for SHM file creation
441/// - Sets up SHM transport and RPC session
442/// - Configures the service dispatcher
443/// - Runs the session loop
444///
445/// # Example
446///
447/// ```rust,ignore
448/// use rapace_cell::{run, ServiceDispatch};
449/// use rapace::{Frame, RpcError};
450/// use std::future::Future;
451/// use std::pin::Pin;
452///
453/// # struct MyServiceServer;
454/// # impl MyServiceServer {
455/// #     fn new(impl_: ()) -> Self { Self }
456/// #     async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
457/// #         unimplemented!()
458/// #     }
459/// # }
460/// # impl ServiceDispatch for MyServiceServer {
461/// #     fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
462/// #         Box::pin(Self::dispatch_impl(self, method_id, payload))
463/// #     }
464/// # }
465///
466/// #[tokio::main]
467/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
468///     let server = MyServiceServer::new(());
469///     run(server).await?;
470///     Ok(())
471/// }
472/// ```
473pub async fn run<S>(service: S) -> Result<(), CellError>
474where
475    S: ServiceDispatch,
476{
477    run_with_config(service, DEFAULT_SHM_CONFIG).await
478}
479
480/// Run a single-service cell with custom SHM configuration
481///
482/// This automatically sets up rapace-tracing to forward logs to the host.
483/// When running in hub mode (with `--peer-id`), this automatically sends a
484/// `CellLifecycle.ready()` signal to the host after the session is established.
485pub async fn run_with_config<S>(service: S, config: ShmSessionConfig) -> Result<(), CellError>
486where
487    S: ServiceDispatch,
488{
489    let setup = setup_cell(config).await?;
490
491    let session = setup.session;
492    let peer_id = setup.peer_id;
493    let cell_name = cell_name_guess();
494
495    // Expose TracingConfig early (so the host can push filters), but don't install
496    // the forwarding tracing layer until after the ready handshake.
497    let (tracing_filter, tracing_service) = tracing_setup::create_tracing_config_service();
498
499    // IMPORTANT: Set up dispatcher before starting demux.
500    // We intentionally delay installing the tracing layer until after ready, to avoid
501    // startup floods on contended transports.
502    session.set_dispatcher(
503        DispatcherBuilder::new()
504            .add_service(tracing_service)
505            .add_service(service)
506            .build(),
507    );
508
509    // Start demux loop in background so we can send ready signal
510    let run_task = {
511        let session = session.clone();
512        tokio::spawn(async move { session.run().await })
513    };
514
515    // Yield to ensure demux task gets scheduled
516    for _ in 0..10 {
517        tokio::task::yield_now().await;
518    }
519
520    // Send ready signal FIRST, before tracing is initialized
521    if let Some(peer_id) = peer_id {
522        let client = CellLifecycleClient::new(session.clone());
523        let msg = ReadyMsg {
524            peer_id,
525            cell_name: cell_name.clone(),
526            pid: Some(std::process::id()),
527            version: None,
528            features: vec![],
529        };
530        if !quiet_mode_enabled() {
531            eprintln!(
532                "[rapace-cell] {} (peer_id={}) sending ready signal...",
533                cell_name, peer_id
534            );
535        }
536        // Retry the handshake; hub slot allocation can be temporarily contended during parallel startup.
537        match ready_handshake_with_backoff(&client, msg).await {
538            Ok(ack) => {
539                if !quiet_mode_enabled() {
540                    eprintln!(
541                        "[rapace-cell] {} (peer_id={}) ready acknowledged: ok={}",
542                        cell_name, peer_id, ack.ok
543                    );
544                }
545            }
546            Err(e) => {
547                if !quiet_mode_enabled() {
548                    eprintln!(
549                        "[rapace-cell] {} (peer_id={}) ready FAILED: {:?}",
550                        cell_name, peer_id, e
551                    );
552                }
553            }
554        }
555    }
556
557    // NOW initialize tracing (after ready signal is confirmed)
558    tracing_setup::install_tracing_layer(session.clone(), tracing_filter);
559    tracing::debug!(target: "cell", cell = %cell_name, "Connected to host via SHM: {}", setup.path.display());
560
561    // Wait for session to complete
562    match run_task.await {
563        Ok(result) => result?,
564        Err(join_err) => {
565            return Err(CellError::Transport(TransportError::Io(
566                std::io::Error::other(format!("demux task join error: {join_err}")),
567            )));
568        }
569    }
570
571    Ok(())
572}
573
574/// Run a single-service cell, but let the service factory access the `RpcSession`.
575///
576/// This variant is useful for cells that need to make outgoing RPC calls during setup.
577/// It starts the demux loop in a background task before invoking `factory`.
578pub async fn run_with_session<F, S>(factory: F) -> Result<(), CellError>
579where
580    F: FnOnce(Arc<RpcSession>) -> S,
581    S: ServiceDispatch,
582{
583    run_with_session_and_config(factory, DEFAULT_SHM_CONFIG).await
584}
585
586/// Run a single-service cell with session access and custom SHM configuration.
587///
588/// This automatically sets up rapace-tracing to forward logs to the host.
589/// When running in hub mode (with `--peer-id`), this automatically sends a
590/// `CellLifecycle.ready()` signal to the host after the session is established.
591pub async fn run_with_session_and_config<F, S>(
592    factory: F,
593    config: ShmSessionConfig,
594) -> Result<(), CellError>
595where
596    F: FnOnce(Arc<RpcSession>) -> S,
597    S: ServiceDispatch,
598{
599    let setup = setup_cell(config).await?;
600
601    let session = setup.session;
602    let peer_id = setup.peer_id;
603    let cell_name = cell_name_guess();
604
605    // Create service from factory (needs session)
606    let service = factory(session.clone());
607
608    // Expose TracingConfig early (so the host can push filters), but don't install
609    // the forwarding tracing layer until after the ready handshake.
610    let (tracing_filter, tracing_service) = tracing_setup::create_tracing_config_service();
611
612    // IMPORTANT: Set up dispatcher before starting demux.
613    session.set_dispatcher(
614        DispatcherBuilder::new()
615            .add_service(tracing_service)
616            .add_service(service)
617            .build(),
618    );
619
620    // Start demux loop in background, so outgoing RPC calls won't deadlock on
621    // current_thread runtimes.
622    let run_task = {
623        let session = session.clone();
624        tokio::spawn(async move { session.run().await })
625    };
626
627    // Yield a few times to ensure the demux task gets scheduled.
628    for _ in 0..10 {
629        tokio::task::yield_now().await;
630    }
631
632    // Send ready signal FIRST, before tracing is initialized
633    if let Some(peer_id) = peer_id {
634        let client = CellLifecycleClient::new(session.clone());
635        let msg = ReadyMsg {
636            peer_id,
637            cell_name: cell_name.clone(),
638            pid: Some(std::process::id()),
639            version: None,
640            features: vec![],
641        };
642        if !quiet_mode_enabled() {
643            eprintln!(
644                "[rapace-cell] {} (peer_id={}) sending ready signal...",
645                cell_name, peer_id
646            );
647        }
648        // Retry the handshake; hub slot allocation can be temporarily contended during parallel startup.
649        match ready_handshake_with_backoff(&client, msg).await {
650            Ok(ack) => {
651                if !quiet_mode_enabled() {
652                    eprintln!(
653                        "[rapace-cell] {} (peer_id={}) ready acknowledged: ok={}",
654                        cell_name, peer_id, ack.ok
655                    );
656                }
657            }
658            Err(e) => {
659                if !quiet_mode_enabled() {
660                    eprintln!(
661                        "[rapace-cell] {} (peer_id={}) ready FAILED: {:?}",
662                        cell_name, peer_id, e
663                    );
664                }
665            }
666        }
667    }
668
669    // NOW initialize tracing (after ready signal is confirmed)
670    tracing_setup::install_tracing_layer(session.clone(), tracing_filter);
671    tracing::debug!(target: "cell", cell = %cell_name, "Connected to host via SHM: {}", setup.path.display());
672
673    match run_task.await {
674        Ok(result) => result?,
675        Err(join_err) => {
676            return Err(CellError::Transport(TransportError::Io(
677                std::io::Error::other(format!("demux task join error: {join_err}")),
678            )));
679        }
680    }
681
682    Ok(())
683}
684
685fn ready_total_timeout() -> std::time::Duration {
686    // Keep compatibility with dodeca's historical knob while providing a generic name too.
687    let timeout_ms = std::env::var("RAPACE_CELL_READY_TIMEOUT_MS")
688        .ok()
689        .and_then(|s| s.parse::<u64>().ok())
690        .or_else(|| {
691            std::env::var("DODECA_CELL_READY_TIMEOUT_MS")
692                .ok()
693                .and_then(|s| s.parse::<u64>().ok())
694        })
695        .unwrap_or(10_000);
696
697    std::time::Duration::from_millis(timeout_ms)
698}
699
700async fn ready_handshake_with_backoff(
701    client: &CellLifecycleClient,
702    msg: ReadyMsg,
703) -> Result<ReadyAck, RpcError> {
704    let timeout = ready_total_timeout();
705
706    let start = std::time::Instant::now();
707    let mut delay_ms = 10u64;
708
709    loop {
710        match client.ready(msg.clone()).await {
711            Ok(ack) => return Ok(ack),
712            Err(e) => {
713                if start.elapsed() >= timeout {
714                    return Err(e);
715                }
716                tracing::debug!(
717                    cell = %msg.cell_name,
718                    peer_id = msg.peer_id,
719                    error = ?e,
720                    delay_ms,
721                    "Ready handshake failed; retrying"
722                );
723            }
724        }
725
726        tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
727        delay_ms = (delay_ms * 2).min(200);
728    }
729}
730
731/// Run a multi-service cell
732///
733/// This function handles all the boilerplate for a multi-service cell.
734/// The builder function receives a `DispatcherBuilder` to configure which
735/// services the cell exposes.
736///
737/// # Example
738///
739/// ```rust,ignore
740/// use rapace_cell::{run_multi, DispatcherBuilder, ServiceDispatch};
741/// use rapace::{Frame, RpcError};
742/// use std::future::Future;
743/// use std::pin::Pin;
744///
745/// # struct MyServiceServer;
746/// # struct AnotherServiceServer;
747/// # impl MyServiceServer {
748/// #     fn new(impl_: ()) -> Self { Self }
749/// #     async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
750/// #         unimplemented!()
751/// #     }
752/// # }
753/// # impl AnotherServiceServer {
754/// #     fn new(impl_: ()) -> Self { Self }
755/// #     async fn dispatch_impl(&self, method_id: u32, payload: &[u8]) -> Result<Frame, RpcError> {
756/// #         unimplemented!()
757/// #     }
758/// # }
759/// # impl ServiceDispatch for MyServiceServer {
760/// #     fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
761/// #         Box::pin(Self::dispatch_impl(self, method_id, payload))
762/// #     }
763/// # }
764/// # impl ServiceDispatch for AnotherServiceServer {
765/// #     fn dispatch(&self, method_id: u32, payload: &[u8]) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>> {
766/// #         Box::pin(Self::dispatch_impl(self, method_id, payload))
767/// #     }
768/// # }
769///
770/// #[tokio::main]
771/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
772///     run_multi(|builder| {
773///         builder
774///             .add_service(MyServiceServer::new(()))
775///             .add_service(AnotherServiceServer::new(()))
776///     }).await?;
777///     Ok(())
778/// }
779/// ```
780pub async fn run_multi<F>(builder_fn: F) -> Result<(), CellError>
781where
782    F: FnOnce(DispatcherBuilder) -> DispatcherBuilder,
783{
784    run_multi_with_config(builder_fn, DEFAULT_SHM_CONFIG).await
785}
786
787/// Run a multi-service cell with custom SHM configuration
788///
789/// This automatically sets up rapace-tracing to forward logs to the host.
790pub async fn run_multi_with_config<F>(
791    builder_fn: F,
792    config: ShmSessionConfig,
793) -> Result<(), CellError>
794where
795    F: FnOnce(DispatcherBuilder) -> DispatcherBuilder,
796{
797    let setup = setup_cell(config).await?;
798
799    let session = setup.session;
800    let peer_id = setup.peer_id;
801    let cell_name = cell_name_guess();
802
803    // Expose TracingConfig early (so the host can push filters), but don't install
804    // the forwarding tracing layer until after the ready handshake.
805    let (tracing_filter, tracing_service) = tracing_setup::create_tracing_config_service();
806
807    // Build the dispatcher with user services + tracing config
808    let builder = DispatcherBuilder::new();
809    let builder = builder_fn(builder);
810    let builder = builder.add_service(tracing_service);
811    let dispatcher = builder.build();
812
813    session.set_dispatcher(dispatcher);
814
815    // Start demux loop in background so we can send ready signal
816    let run_task = {
817        let session = session.clone();
818        tokio::spawn(async move { session.run().await })
819    };
820
821    // Yield a few times to ensure the demux task gets scheduled.
822    for _ in 0..10 {
823        tokio::task::yield_now().await;
824    }
825
826    // Send ready signal (hub mode only)
827    if let Some(peer_id) = peer_id {
828        let client = CellLifecycleClient::new(session.clone());
829        let msg = ReadyMsg {
830            peer_id,
831            cell_name: cell_name.clone(),
832            pid: Some(std::process::id()),
833            version: None,
834            features: vec![],
835        };
836        if !quiet_mode_enabled() {
837            eprintln!(
838                "[rapace-cell] {} (peer_id={}) sending ready signal...",
839                cell_name, peer_id
840            );
841        }
842        match ready_handshake_with_backoff(&client, msg).await {
843            Ok(ack) => {
844                if !quiet_mode_enabled() {
845                    eprintln!(
846                        "[rapace-cell] {} (peer_id={}) ready acknowledged: ok={}",
847                        cell_name, peer_id, ack.ok
848                    );
849                }
850            }
851            Err(e) => {
852                if !quiet_mode_enabled() {
853                    eprintln!(
854                        "[rapace-cell] {} (peer_id={}) ready FAILED: {:?}",
855                        cell_name, peer_id, e
856                    );
857                }
858            }
859        }
860    }
861
862    // Install tracing forwarding now that the cell is ready.
863    tracing_setup::install_tracing_layer(session.clone(), tracing_filter);
864    tracing::debug!(target: "cell", cell = %cell_name, "Connected to host via SHM: {}", setup.path.display());
865
866    // Wait for session to complete
867    match run_task.await {
868        Ok(result) => result?,
869        Err(join_err) => {
870            return Err(CellError::Transport(TransportError::Io(
871                std::io::Error::other(format!("demux task join error: {join_err}")),
872            )));
873        }
874    }
875
876    Ok(())
877}
878
879/// Extension trait for RpcSession to support single-service setup
880pub trait RpcSessionExt {
881    /// Set a single service as the dispatcher for this session
882    ///
883    /// This is a convenience method for cells that only expose one service.
884    /// For multi-service cells, use `set_dispatcher` with a `DispatcherBuilder`.
885    fn set_service<S>(&self, service: S)
886    where
887        S: ServiceDispatch;
888}
889
890impl RpcSessionExt for RpcSession {
891    fn set_service<S>(&self, service: S)
892    where
893        S: ServiceDispatch,
894    {
895        let service = Arc::new(service);
896        let dispatcher = move |request: Frame| {
897            let service = service.clone();
898            Box::pin(async move {
899                let mut response = service
900                    .dispatch(request.desc.method_id, request.payload_bytes())
901                    .await?;
902                response.desc.channel_id = request.desc.channel_id;
903                response.desc.msg_id = request.desc.msg_id;
904                Ok(response)
905            })
906        };
907        self.set_dispatcher(dispatcher);
908    }
909}
910
911/// Macro to run a cell with minimal boilerplate.
912///
913/// Generates a `current_thread` tokio main that calls `rapace_cell::run(...)`.
914#[macro_export]
915macro_rules! run_cell {
916    ($service:expr) => {
917        #[tokio::main(flavor = "current_thread")]
918        async fn main() -> Result<(), Box<dyn std::error::Error>> {
919            $crate::run($service).await?;
920            Ok(())
921        }
922    };
923}
924
925/// Macro to run a cell whose setup needs access to the RPC session.
926///
927/// Generates a `current_thread` tokio main that calls `rapace_cell::run_with_session(...)`.
928#[macro_export]
929macro_rules! run_cell_with_session {
930    ($factory:expr) => {
931        #[tokio::main(flavor = "current_thread")]
932        async fn main() -> Result<(), Box<dyn std::error::Error>> {
933            $crate::run_with_session($factory).await?;
934            Ok(())
935        }
936    };
937}
938
939/// Macro to wrap a generated server type as a `ServiceDispatch` cell service.
940///
941/// This is convenient when a proc-macro generates `FooServer<T>` where `FooServer::new(T)`
942/// constructs the server and `FooServer::dispatch(method_id, bytes)` routes calls.
943#[macro_export]
944macro_rules! cell_service {
945    ($server_type:ty, $impl_type:ty) => {
946        struct CellService(std::sync::Arc<$server_type>);
947
948        impl $crate::ServiceDispatch for CellService {
949            fn dispatch(
950                &self,
951                method_id: u32,
952                payload: &[u8],
953            ) -> std::pin::Pin<
954                Box<
955                    dyn std::future::Future<
956                            Output = std::result::Result<$crate::Frame, $crate::RpcError>,
957                        > + Send
958                        + 'static,
959                >,
960            > {
961                let server = self.0.clone();
962                let bytes = payload.to_vec();
963                Box::pin(async move { server.dispatch(method_id, &bytes).await })
964            }
965        }
966
967        impl From<$impl_type> for CellService {
968            fn from(impl_val: $impl_type) -> Self {
969                Self(std::sync::Arc::new(<$server_type>::new(impl_val)))
970            }
971        }
972    };
973}