1#![doc = include_str!("../README.md")]
2
3use std::error::Error as StdError;
4use std::future::Future;
5use std::path::PathBuf;
6use std::pin::Pin;
7use std::sync::Arc;
8
9use rapace::transport::shm::{ShmSession, ShmSessionConfig, ShmTransport};
10use rapace::{Transport, TransportError};
11
12pub use rapace::{Frame, RpcError, RpcSession};
15
16pub mod lifecycle;
17pub use lifecycle::{CellLifecycle, CellLifecycleClient, CellLifecycleServer, ReadyAck, ReadyMsg};
18
19pub mod tracing_setup;
20pub use tracing_setup::TracingConfigService;
21
22#[cfg(unix)]
23use rapace::transport::shm::{Doorbell, HubPeer};
24#[cfg(unix)]
25use std::os::unix::io::RawFd;
26
27fn quiet_mode_enabled() -> bool {
28 fn env_truthy(key: &str) -> bool {
29 match std::env::var_os(key) {
30 None => false,
31 Some(v) => {
32 let s = v.to_string_lossy();
33 !(s.is_empty() || s == "0" || s.eq_ignore_ascii_case("false"))
34 }
35 }
36 }
37
38 env_truthy("RAPACE_QUIET") || env_truthy("DODECA_QUIET")
40}
41
42pub const DEFAULT_SHM_CONFIG: ShmSessionConfig = ShmSessionConfig {
49 ring_capacity: 256, slot_size: 65536, slot_count: 128, };
53
54const CELL_CHANNEL_START: u32 = 2;
57
58#[derive(Debug)]
60pub enum CellError {
61 Args(String),
63 ShmTimeout(PathBuf),
65 HubTimeout(PathBuf),
67 ShmOpen(String),
69 HubOpen(String),
71 HubArgs(String),
73 DoorbellFd(String),
75 Rpc(RpcError),
77 Transport(TransportError),
79}
80
81impl std::fmt::Display for CellError {
82 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 match self {
84 Self::Args(msg) => write!(f, "Argument error: {}", msg),
85 Self::ShmTimeout(path) => write!(f, "SHM file not created by host: {}", path.display()),
86 Self::HubTimeout(path) => write!(f, "Hub file not created by host: {}", path.display()),
87 Self::ShmOpen(msg) => write!(f, "Failed to open SHM: {}", msg),
88 Self::HubOpen(msg) => write!(f, "Failed to open hub: {}", msg),
89 Self::HubArgs(msg) => write!(f, "Hub argument error: {}", msg),
90 Self::DoorbellFd(msg) => write!(f, "Doorbell fd error: {}", msg),
91 Self::Rpc(e) => write!(f, "RPC error: {:?}", e),
92 Self::Transport(e) => write!(f, "Transport error: {:?}", e),
93 }
94 }
95}
96
97impl StdError for CellError {}
98
99impl From<RpcError> for CellError {
100 fn from(e: RpcError) -> Self {
101 Self::Rpc(e)
102 }
103}
104
105impl From<TransportError> for CellError {
106 fn from(e: TransportError) -> Self {
107 Self::Transport(e)
108 }
109}
110
111pub trait ServiceDispatch: Send + Sync + 'static {
113 fn dispatch(
115 &self,
116 method_id: u32,
117 payload: &[u8],
118 ) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>>;
119}
120
121pub struct DispatcherBuilder {
123 services: Vec<Box<dyn ServiceDispatch>>,
124}
125
126impl DispatcherBuilder {
127 pub fn new() -> Self {
129 Self {
130 services: Vec::new(),
131 }
132 }
133
134 pub fn add_service<S>(mut self, service: S) -> Self
136 where
137 S: ServiceDispatch,
138 {
139 self.services.push(Box::new(service));
140 self
141 }
142
143 #[cfg(feature = "introspection")]
160 pub fn with_introspection(self) -> Self {
161 use rapace_introspection::{DefaultServiceIntrospection, ServiceIntrospectionServer};
162
163 let introspection = DefaultServiceIntrospection::new();
164 let server = Arc::new(ServiceIntrospectionServer::new(introspection));
165
166 struct IntrospectionDispatcher(
168 Arc<ServiceIntrospectionServer<DefaultServiceIntrospection>>,
169 );
170
171 impl ServiceDispatch for IntrospectionDispatcher {
172 fn dispatch(
173 &self,
174 method_id: u32,
175 payload: &[u8],
176 ) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>>
177 {
178 let payload_owned = payload.to_vec();
180 let server = self.0.clone();
181 Box::pin(async move { server.dispatch(method_id, &payload_owned).await })
182 }
183 }
184
185 self.add_service(IntrospectionDispatcher(server))
186 }
187
188 #[allow(clippy::type_complexity)]
190 pub fn build(
191 self,
192 ) -> impl Fn(Frame) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send>>
193 + Send
194 + Sync
195 + 'static {
196 let services = Arc::new(self.services);
197 move |request: Frame| {
198 let services = services.clone();
199 Box::pin(async move {
200 let method_id = request.desc.method_id;
201 let payload = request.payload_bytes();
202
203 for service in services.iter() {
205 let result = service.dispatch(method_id, payload).await;
206
207 if !matches!(
209 &result,
210 Err(RpcError::Status {
211 code: rapace::ErrorCode::Unimplemented,
212 ..
213 })
214 ) {
215 let mut response = result?;
216 response.desc.channel_id = request.desc.channel_id;
217 response.desc.msg_id = request.desc.msg_id;
218 return Ok(response);
219 }
220 }
221
222 let error_msg = rapace_registry::ServiceRegistry::with_global(|reg| {
224 if let Some(method) = reg.method_by_id(rapace_registry::MethodId(method_id)) {
225 format!(
226 "Method '{}' (id={}) exists in registry but is not implemented by any service in this cell",
227 method.full_name, method_id
228 )
229 } else {
230 format!(
231 "Unknown method_id: {} (not registered in global registry)",
232 method_id
233 )
234 }
235 });
236
237 Err(RpcError::Status {
238 code: rapace::ErrorCode::Unimplemented,
239 message: error_msg,
240 })
241 })
242 }
243 }
244}
245
246impl Default for DispatcherBuilder {
247 fn default() -> Self {
248 Self::new()
249 }
250}
251
252enum ParsedArgs {
253 Pair {
254 shm_path: PathBuf,
255 },
256 #[cfg(unix)]
257 Hub {
258 hub_path: PathBuf,
259 peer_id: u16,
260 doorbell_fd: RawFd,
261 },
262}
263
264fn parse_args() -> Result<ParsedArgs, CellError> {
266 let mut shm_path: Option<PathBuf> = None;
267 let mut hub_path: Option<PathBuf> = None;
268 let mut peer_id: Option<u16> = None;
269 #[cfg(unix)]
270 let mut doorbell_fd: Option<RawFd> = None;
271
272 for arg in std::env::args().skip(1) {
273 if let Some(value) = arg.strip_prefix("--shm-path=") {
274 shm_path = Some(PathBuf::from(value));
275 } else if let Some(value) = arg.strip_prefix("--hub-path=") {
276 hub_path = Some(PathBuf::from(value));
277 } else if let Some(value) = arg.strip_prefix("--peer-id=") {
278 peer_id = value.parse::<u16>().ok();
279 } else if let Some(value) = arg.strip_prefix("--doorbell-fd=") {
280 #[cfg(unix)]
281 {
282 doorbell_fd = value.parse::<i32>().ok();
283 }
284 } else if !arg.starts_with("--") && shm_path.is_none() && hub_path.is_none() {
285 shm_path = Some(PathBuf::from(arg));
287 }
288 }
289
290 if let Some(hub_path) = hub_path {
291 #[cfg(not(unix))]
292 {
293 return Err(CellError::HubArgs(
294 "hub mode is only supported on unix platforms".to_string(),
295 ));
296 }
297
298 #[cfg(unix)]
299 {
300 let peer_id = peer_id
301 .ok_or_else(|| CellError::HubArgs("Missing --peer-id for hub mode".to_string()))?;
302 let doorbell_fd = doorbell_fd.ok_or_else(|| {
303 CellError::HubArgs("Missing --doorbell-fd for hub mode".to_string())
304 })?;
305 return Ok(ParsedArgs::Hub {
306 hub_path,
307 peer_id,
308 doorbell_fd,
309 });
310 }
311 }
312
313 if let Some(shm_path) = shm_path {
314 return Ok(ParsedArgs::Pair { shm_path });
315 }
316
317 Err(CellError::Args(
318 "Missing SHM path (use --shm-path=PATH or provide as first argument)".to_string(),
319 ))
320}
321
322async fn wait_for_shm(path: &std::path::Path, timeout_ms: u64) -> Result<(), CellError> {
324 let attempts = timeout_ms / 100;
325 for i in 0..attempts {
326 if path.exists() {
327 return Ok(());
328 }
329 if i < attempts - 1 {
330 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
331 }
332 }
333 Err(CellError::ShmTimeout(path.to_path_buf()))
334}
335
336async fn wait_for_hub(path: &std::path::Path, timeout_ms: u64) -> Result<(), CellError> {
337 let attempts = timeout_ms / 100;
338 for i in 0..attempts {
339 if path.exists() {
340 return Ok(());
341 }
342 if i < attempts - 1 {
343 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
344 }
345 }
346 Err(CellError::HubTimeout(path.to_path_buf()))
347}
348
349#[cfg(unix)]
350fn validate_doorbell_fd(fd: RawFd) -> Result<(), CellError> {
351 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
352 if flags < 0 {
353 return Err(CellError::DoorbellFd(format!(
354 "doorbell fd {fd} is invalid: {}",
355 std::io::Error::last_os_error()
356 )));
357 }
358 Ok(())
359}
360
361fn cell_name_guess() -> String {
362 std::env::current_exe()
363 .ok()
364 .and_then(|p| p.file_stem().map(|s| s.to_string_lossy().into_owned()))
365 .unwrap_or_else(|| "cell".to_string())
366}
367
368struct CellSetup {
370 session: Arc<RpcSession>,
371 #[allow(dead_code)]
372 path: PathBuf,
373 peer_id: Option<u16>,
375}
376
377async fn setup_cell(config: ShmSessionConfig) -> Result<CellSetup, CellError> {
379 ur_taking_me_with_you::die_with_parent();
382
383 match parse_args()? {
384 ParsedArgs::Pair { shm_path } => {
385 wait_for_shm(&shm_path, 5000).await?;
386
387 let shm_session = ShmSession::open_file(&shm_path, config)
388 .map_err(|e| CellError::ShmOpen(format!("{:?}", e)))?;
389
390 let transport = Transport::Shm(ShmTransport::new(shm_session));
391 let session = Arc::new(RpcSession::with_channel_start(
392 transport,
393 CELL_CHANNEL_START,
394 ));
395 Ok(CellSetup {
396 session,
397 path: shm_path,
398 peer_id: None,
399 })
400 }
401 #[cfg(unix)]
402 ParsedArgs::Hub {
403 hub_path,
404 peer_id,
405 doorbell_fd,
406 } => {
407 wait_for_hub(&hub_path, 5000).await?;
408 validate_doorbell_fd(doorbell_fd)?;
409
410 let peer = HubPeer::open(&hub_path, peer_id)
411 .map_err(|e| CellError::HubOpen(format!("{:?}", e)))?;
412 peer.register();
413
414 let doorbell = Doorbell::from_raw_fd(doorbell_fd)
415 .map_err(|e| CellError::DoorbellFd(format!("{:?}", e)))?;
416
417 let transport = Transport::Shm(ShmTransport::hub_peer(
418 Arc::new(peer),
419 doorbell,
420 cell_name_guess(),
421 ));
422
423 let session = Arc::new(RpcSession::with_channel_start(
424 transport,
425 CELL_CHANNEL_START,
426 ));
427 Ok(CellSetup {
428 session,
429 path: hub_path,
430 peer_id: Some(peer_id),
431 })
432 }
433 }
434}
435
436pub async fn run<S>(service: S) -> Result<(), CellError>
474where
475 S: ServiceDispatch,
476{
477 run_with_config(service, DEFAULT_SHM_CONFIG).await
478}
479
480pub async fn run_with_config<S>(service: S, config: ShmSessionConfig) -> Result<(), CellError>
486where
487 S: ServiceDispatch,
488{
489 let setup = setup_cell(config).await?;
490
491 let session = setup.session;
492 let peer_id = setup.peer_id;
493 let cell_name = cell_name_guess();
494
495 let (tracing_filter, tracing_service) = tracing_setup::create_tracing_config_service();
498
499 session.set_dispatcher(
503 DispatcherBuilder::new()
504 .add_service(tracing_service)
505 .add_service(service)
506 .build(),
507 );
508
509 let run_task = {
511 let session = session.clone();
512 tokio::spawn(async move { session.run().await })
513 };
514
515 for _ in 0..10 {
517 tokio::task::yield_now().await;
518 }
519
520 if let Some(peer_id) = peer_id {
522 let client = CellLifecycleClient::new(session.clone());
523 let msg = ReadyMsg {
524 peer_id,
525 cell_name: cell_name.clone(),
526 pid: Some(std::process::id()),
527 version: None,
528 features: vec![],
529 };
530 if !quiet_mode_enabled() {
531 eprintln!(
532 "[rapace-cell] {} (peer_id={}) sending ready signal...",
533 cell_name, peer_id
534 );
535 }
536 match ready_handshake_with_backoff(&client, msg).await {
538 Ok(ack) => {
539 if !quiet_mode_enabled() {
540 eprintln!(
541 "[rapace-cell] {} (peer_id={}) ready acknowledged: ok={}",
542 cell_name, peer_id, ack.ok
543 );
544 }
545 }
546 Err(e) => {
547 if !quiet_mode_enabled() {
548 eprintln!(
549 "[rapace-cell] {} (peer_id={}) ready FAILED: {:?}",
550 cell_name, peer_id, e
551 );
552 }
553 }
554 }
555 }
556
557 tracing_setup::install_tracing_layer(session.clone(), tracing_filter);
559 tracing::debug!(target: "cell", cell = %cell_name, "Connected to host via SHM: {}", setup.path.display());
560
561 match run_task.await {
563 Ok(result) => result?,
564 Err(join_err) => {
565 return Err(CellError::Transport(TransportError::Io(
566 std::io::Error::other(format!("demux task join error: {join_err}")),
567 )));
568 }
569 }
570
571 Ok(())
572}
573
574pub async fn run_with_session<F, S>(factory: F) -> Result<(), CellError>
579where
580 F: FnOnce(Arc<RpcSession>) -> S,
581 S: ServiceDispatch,
582{
583 run_with_session_and_config(factory, DEFAULT_SHM_CONFIG).await
584}
585
586pub async fn run_with_session_and_config<F, S>(
592 factory: F,
593 config: ShmSessionConfig,
594) -> Result<(), CellError>
595where
596 F: FnOnce(Arc<RpcSession>) -> S,
597 S: ServiceDispatch,
598{
599 let setup = setup_cell(config).await?;
600
601 let session = setup.session;
602 let peer_id = setup.peer_id;
603 let cell_name = cell_name_guess();
604
605 let service = factory(session.clone());
607
608 let (tracing_filter, tracing_service) = tracing_setup::create_tracing_config_service();
611
612 session.set_dispatcher(
614 DispatcherBuilder::new()
615 .add_service(tracing_service)
616 .add_service(service)
617 .build(),
618 );
619
620 let run_task = {
623 let session = session.clone();
624 tokio::spawn(async move { session.run().await })
625 };
626
627 for _ in 0..10 {
629 tokio::task::yield_now().await;
630 }
631
632 if let Some(peer_id) = peer_id {
634 let client = CellLifecycleClient::new(session.clone());
635 let msg = ReadyMsg {
636 peer_id,
637 cell_name: cell_name.clone(),
638 pid: Some(std::process::id()),
639 version: None,
640 features: vec![],
641 };
642 if !quiet_mode_enabled() {
643 eprintln!(
644 "[rapace-cell] {} (peer_id={}) sending ready signal...",
645 cell_name, peer_id
646 );
647 }
648 match ready_handshake_with_backoff(&client, msg).await {
650 Ok(ack) => {
651 if !quiet_mode_enabled() {
652 eprintln!(
653 "[rapace-cell] {} (peer_id={}) ready acknowledged: ok={}",
654 cell_name, peer_id, ack.ok
655 );
656 }
657 }
658 Err(e) => {
659 if !quiet_mode_enabled() {
660 eprintln!(
661 "[rapace-cell] {} (peer_id={}) ready FAILED: {:?}",
662 cell_name, peer_id, e
663 );
664 }
665 }
666 }
667 }
668
669 tracing_setup::install_tracing_layer(session.clone(), tracing_filter);
671 tracing::debug!(target: "cell", cell = %cell_name, "Connected to host via SHM: {}", setup.path.display());
672
673 match run_task.await {
674 Ok(result) => result?,
675 Err(join_err) => {
676 return Err(CellError::Transport(TransportError::Io(
677 std::io::Error::other(format!("demux task join error: {join_err}")),
678 )));
679 }
680 }
681
682 Ok(())
683}
684
685fn ready_total_timeout() -> std::time::Duration {
686 let timeout_ms = std::env::var("RAPACE_CELL_READY_TIMEOUT_MS")
688 .ok()
689 .and_then(|s| s.parse::<u64>().ok())
690 .or_else(|| {
691 std::env::var("DODECA_CELL_READY_TIMEOUT_MS")
692 .ok()
693 .and_then(|s| s.parse::<u64>().ok())
694 })
695 .unwrap_or(10_000);
696
697 std::time::Duration::from_millis(timeout_ms)
698}
699
700async fn ready_handshake_with_backoff(
701 client: &CellLifecycleClient,
702 msg: ReadyMsg,
703) -> Result<ReadyAck, RpcError> {
704 let timeout = ready_total_timeout();
705
706 let start = std::time::Instant::now();
707 let mut delay_ms = 10u64;
708
709 loop {
710 match client.ready(msg.clone()).await {
711 Ok(ack) => return Ok(ack),
712 Err(e) => {
713 if start.elapsed() >= timeout {
714 return Err(e);
715 }
716 tracing::debug!(
717 cell = %msg.cell_name,
718 peer_id = msg.peer_id,
719 error = ?e,
720 delay_ms,
721 "Ready handshake failed; retrying"
722 );
723 }
724 }
725
726 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
727 delay_ms = (delay_ms * 2).min(200);
728 }
729}
730
731pub async fn run_multi<F>(builder_fn: F) -> Result<(), CellError>
781where
782 F: FnOnce(DispatcherBuilder) -> DispatcherBuilder,
783{
784 run_multi_with_config(builder_fn, DEFAULT_SHM_CONFIG).await
785}
786
787pub async fn run_multi_with_config<F>(
791 builder_fn: F,
792 config: ShmSessionConfig,
793) -> Result<(), CellError>
794where
795 F: FnOnce(DispatcherBuilder) -> DispatcherBuilder,
796{
797 let setup = setup_cell(config).await?;
798
799 let session = setup.session;
800 let peer_id = setup.peer_id;
801 let cell_name = cell_name_guess();
802
803 let (tracing_filter, tracing_service) = tracing_setup::create_tracing_config_service();
806
807 let builder = DispatcherBuilder::new();
809 let builder = builder_fn(builder);
810 let builder = builder.add_service(tracing_service);
811 let dispatcher = builder.build();
812
813 session.set_dispatcher(dispatcher);
814
815 let run_task = {
817 let session = session.clone();
818 tokio::spawn(async move { session.run().await })
819 };
820
821 for _ in 0..10 {
823 tokio::task::yield_now().await;
824 }
825
826 if let Some(peer_id) = peer_id {
828 let client = CellLifecycleClient::new(session.clone());
829 let msg = ReadyMsg {
830 peer_id,
831 cell_name: cell_name.clone(),
832 pid: Some(std::process::id()),
833 version: None,
834 features: vec![],
835 };
836 if !quiet_mode_enabled() {
837 eprintln!(
838 "[rapace-cell] {} (peer_id={}) sending ready signal...",
839 cell_name, peer_id
840 );
841 }
842 match ready_handshake_with_backoff(&client, msg).await {
843 Ok(ack) => {
844 if !quiet_mode_enabled() {
845 eprintln!(
846 "[rapace-cell] {} (peer_id={}) ready acknowledged: ok={}",
847 cell_name, peer_id, ack.ok
848 );
849 }
850 }
851 Err(e) => {
852 if !quiet_mode_enabled() {
853 eprintln!(
854 "[rapace-cell] {} (peer_id={}) ready FAILED: {:?}",
855 cell_name, peer_id, e
856 );
857 }
858 }
859 }
860 }
861
862 tracing_setup::install_tracing_layer(session.clone(), tracing_filter);
864 tracing::debug!(target: "cell", cell = %cell_name, "Connected to host via SHM: {}", setup.path.display());
865
866 match run_task.await {
868 Ok(result) => result?,
869 Err(join_err) => {
870 return Err(CellError::Transport(TransportError::Io(
871 std::io::Error::other(format!("demux task join error: {join_err}")),
872 )));
873 }
874 }
875
876 Ok(())
877}
878
879pub trait RpcSessionExt {
881 fn set_service<S>(&self, service: S)
886 where
887 S: ServiceDispatch;
888}
889
890impl RpcSessionExt for RpcSession {
891 fn set_service<S>(&self, service: S)
892 where
893 S: ServiceDispatch,
894 {
895 let service = Arc::new(service);
896 let dispatcher = move |request: Frame| {
897 let service = service.clone();
898 Box::pin(async move {
899 let mut response = service
900 .dispatch(request.desc.method_id, request.payload_bytes())
901 .await?;
902 response.desc.channel_id = request.desc.channel_id;
903 response.desc.msg_id = request.desc.msg_id;
904 Ok(response)
905 })
906 };
907 self.set_dispatcher(dispatcher);
908 }
909}
910
911#[macro_export]
915macro_rules! run_cell {
916 ($service:expr) => {
917 #[tokio::main(flavor = "current_thread")]
918 async fn main() -> Result<(), Box<dyn std::error::Error>> {
919 $crate::run($service).await?;
920 Ok(())
921 }
922 };
923}
924
925#[macro_export]
929macro_rules! run_cell_with_session {
930 ($factory:expr) => {
931 #[tokio::main(flavor = "current_thread")]
932 async fn main() -> Result<(), Box<dyn std::error::Error>> {
933 $crate::run_with_session($factory).await?;
934 Ok(())
935 }
936 };
937}
938
939#[macro_export]
944macro_rules! cell_service {
945 ($server_type:ty, $impl_type:ty) => {
946 struct CellService(std::sync::Arc<$server_type>);
947
948 impl $crate::ServiceDispatch for CellService {
949 fn dispatch(
950 &self,
951 method_id: u32,
952 payload: &[u8],
953 ) -> std::pin::Pin<
954 Box<
955 dyn std::future::Future<
956 Output = std::result::Result<$crate::Frame, $crate::RpcError>,
957 > + Send
958 + 'static,
959 >,
960 > {
961 let server = self.0.clone();
962 let bytes = payload.to_vec();
963 Box::pin(async move { server.dispatch(method_id, &bytes).await })
964 }
965 }
966
967 impl From<$impl_type> for CellService {
968 fn from(impl_val: $impl_type) -> Self {
969 Self(std::sync::Arc::new(<$server_type>::new(impl_val)))
970 }
971 }
972 };
973}