#![doc = include_str!("../README.md")]
use std::error::Error as StdError;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use rapace::transport::shm::{ShmSession, ShmSessionConfig, ShmTransport};
use rapace::{Transport, TransportError};
pub use rapace::{Frame, RpcError, RpcSession};
pub mod lifecycle;
pub use lifecycle::{CellLifecycle, CellLifecycleClient, CellLifecycleServer, ReadyAck, ReadyMsg};
pub mod tracing_setup;
pub use tracing_setup::TracingConfigService;
#[cfg(unix)]
use rapace::transport::shm::{Doorbell, HubPeer};
#[cfg(unix)]
use std::os::unix::io::RawFd;
fn quiet_mode_enabled() -> bool {
fn env_truthy(key: &str) -> bool {
match std::env::var_os(key) {
None => false,
Some(v) => {
let s = v.to_string_lossy();
!(s.is_empty() || s == "0" || s.eq_ignore_ascii_case("false"))
}
}
}
env_truthy("RAPACE_QUIET") || env_truthy("DODECA_QUIET")
}
pub const DEFAULT_SHM_CONFIG: ShmSessionConfig = ShmSessionConfig {
ring_capacity: 256, slot_size: 65536, slot_count: 128, };
const CELL_CHANNEL_START: u32 = 2;
#[derive(Debug)]
pub enum CellError {
Args(String),
ShmTimeout(PathBuf),
HubTimeout(PathBuf),
ShmOpen(String),
HubOpen(String),
HubArgs(String),
DoorbellFd(String),
Rpc(RpcError),
Transport(TransportError),
}
impl std::fmt::Display for CellError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Args(msg) => write!(f, "Argument error: {}", msg),
Self::ShmTimeout(path) => write!(f, "SHM file not created by host: {}", path.display()),
Self::HubTimeout(path) => write!(f, "Hub file not created by host: {}", path.display()),
Self::ShmOpen(msg) => write!(f, "Failed to open SHM: {}", msg),
Self::HubOpen(msg) => write!(f, "Failed to open hub: {}", msg),
Self::HubArgs(msg) => write!(f, "Hub argument error: {}", msg),
Self::DoorbellFd(msg) => write!(f, "Doorbell fd error: {}", msg),
Self::Rpc(e) => write!(f, "RPC error: {:?}", e),
Self::Transport(e) => write!(f, "Transport error: {:?}", e),
}
}
}
impl StdError for CellError {}
impl From<RpcError> for CellError {
fn from(e: RpcError) -> Self {
Self::Rpc(e)
}
}
impl From<TransportError> for CellError {
fn from(e: TransportError) -> Self {
Self::Transport(e)
}
}
pub trait ServiceDispatch: Send + Sync + 'static {
fn dispatch(
&self,
method_id: u32,
payload: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>>;
}
pub struct DispatcherBuilder {
services: Vec<Box<dyn ServiceDispatch>>,
}
impl DispatcherBuilder {
pub fn new() -> Self {
Self {
services: Vec::new(),
}
}
pub fn add_service<S>(mut self, service: S) -> Self
where
S: ServiceDispatch,
{
self.services.push(Box::new(service));
self
}
#[cfg(feature = "introspection")]
pub fn with_introspection(self) -> Self {
use rapace_introspection::{DefaultServiceIntrospection, ServiceIntrospectionServer};
let introspection = DefaultServiceIntrospection::new();
let server = Arc::new(ServiceIntrospectionServer::new(introspection));
struct IntrospectionDispatcher(
Arc<ServiceIntrospectionServer<DefaultServiceIntrospection>>,
);
impl ServiceDispatch for IntrospectionDispatcher {
fn dispatch(
&self,
method_id: u32,
payload: &[u8],
) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send + 'static>>
{
let payload_owned = payload.to_vec();
let server = self.0.clone();
Box::pin(async move { server.dispatch(method_id, &payload_owned).await })
}
}
self.add_service(IntrospectionDispatcher(server))
}
#[allow(clippy::type_complexity)]
pub fn build(
self,
) -> impl Fn(Frame) -> Pin<Box<dyn Future<Output = Result<Frame, RpcError>> + Send>>
+ Send
+ Sync
+ 'static {
let services = Arc::new(self.services);
move |request: Frame| {
let services = services.clone();
Box::pin(async move {
let method_id = request.desc.method_id;
let payload = request.payload_bytes();
for service in services.iter() {
let result = service.dispatch(method_id, payload).await;
if !matches!(
&result,
Err(RpcError::Status {
code: rapace::ErrorCode::Unimplemented,
..
})
) {
let mut response = result?;
response.desc.channel_id = request.desc.channel_id;
response.desc.msg_id = request.desc.msg_id;
return Ok(response);
}
}
let error_msg = rapace_registry::ServiceRegistry::with_global(|reg| {
if let Some(method) = reg.method_by_id(rapace_registry::MethodId(method_id)) {
format!(
"Method '{}' (id={}) exists in registry but is not implemented by any service in this cell",
method.full_name, method_id
)
} else {
format!(
"Unknown method_id: {} (not registered in global registry)",
method_id
)
}
});
Err(RpcError::Status {
code: rapace::ErrorCode::Unimplemented,
message: error_msg,
})
})
}
}
}
impl Default for DispatcherBuilder {
fn default() -> Self {
Self::new()
}
}
enum ParsedArgs {
Pair {
shm_path: PathBuf,
},
#[cfg(unix)]
Hub {
hub_path: PathBuf,
peer_id: u16,
doorbell_fd: RawFd,
},
}
fn parse_args() -> Result<ParsedArgs, CellError> {
let mut shm_path: Option<PathBuf> = None;
let mut hub_path: Option<PathBuf> = None;
let mut peer_id: Option<u16> = None;
#[cfg(unix)]
let mut doorbell_fd: Option<RawFd> = None;
for arg in std::env::args().skip(1) {
if let Some(value) = arg.strip_prefix("--shm-path=") {
shm_path = Some(PathBuf::from(value));
} else if let Some(value) = arg.strip_prefix("--hub-path=") {
hub_path = Some(PathBuf::from(value));
} else if let Some(value) = arg.strip_prefix("--peer-id=") {
peer_id = value.parse::<u16>().ok();
} else if let Some(value) = arg.strip_prefix("--doorbell-fd=") {
#[cfg(unix)]
{
doorbell_fd = value.parse::<i32>().ok();
}
} else if !arg.starts_with("--") && shm_path.is_none() && hub_path.is_none() {
shm_path = Some(PathBuf::from(arg));
}
}
if let Some(hub_path) = hub_path {
#[cfg(not(unix))]
{
return Err(CellError::HubArgs(
"hub mode is only supported on unix platforms".to_string(),
));
}
#[cfg(unix)]
{
let peer_id = peer_id
.ok_or_else(|| CellError::HubArgs("Missing --peer-id for hub mode".to_string()))?;
let doorbell_fd = doorbell_fd.ok_or_else(|| {
CellError::HubArgs("Missing --doorbell-fd for hub mode".to_string())
})?;
return Ok(ParsedArgs::Hub {
hub_path,
peer_id,
doorbell_fd,
});
}
}
if let Some(shm_path) = shm_path {
return Ok(ParsedArgs::Pair { shm_path });
}
Err(CellError::Args(
"Missing SHM path (use --shm-path=PATH or provide as first argument)".to_string(),
))
}
async fn wait_for_shm(path: &std::path::Path, timeout_ms: u64) -> Result<(), CellError> {
let attempts = timeout_ms / 100;
for i in 0..attempts {
if path.exists() {
return Ok(());
}
if i < attempts - 1 {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
Err(CellError::ShmTimeout(path.to_path_buf()))
}
async fn wait_for_hub(path: &std::path::Path, timeout_ms: u64) -> Result<(), CellError> {
let attempts = timeout_ms / 100;
for i in 0..attempts {
if path.exists() {
return Ok(());
}
if i < attempts - 1 {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
Err(CellError::HubTimeout(path.to_path_buf()))
}
#[cfg(unix)]
fn validate_doorbell_fd(fd: RawFd) -> Result<(), CellError> {
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
if flags < 0 {
return Err(CellError::DoorbellFd(format!(
"doorbell fd {fd} is invalid: {}",
std::io::Error::last_os_error()
)));
}
Ok(())
}
fn cell_name_guess() -> String {
std::env::current_exe()
.ok()
.and_then(|p| p.file_stem().map(|s| s.to_string_lossy().into_owned()))
.unwrap_or_else(|| "cell".to_string())
}
struct CellSetup {
session: Arc<RpcSession>,
#[allow(dead_code)]
path: PathBuf,
peer_id: Option<u16>,
}
async fn setup_cell(config: ShmSessionConfig) -> Result<CellSetup, CellError> {
ur_taking_me_with_you::die_with_parent();
match parse_args()? {
ParsedArgs::Pair { shm_path } => {
wait_for_shm(&shm_path, 5000).await?;
let shm_session = ShmSession::open_file(&shm_path, config)
.map_err(|e| CellError::ShmOpen(format!("{:?}", e)))?;
let transport = Transport::Shm(ShmTransport::new(shm_session));
let session = Arc::new(RpcSession::with_channel_start(
transport,
CELL_CHANNEL_START,
));
Ok(CellSetup {
session,
path: shm_path,
peer_id: None,
})
}
#[cfg(unix)]
ParsedArgs::Hub {
hub_path,
peer_id,
doorbell_fd,
} => {
wait_for_hub(&hub_path, 5000).await?;
validate_doorbell_fd(doorbell_fd)?;
let peer = HubPeer::open(&hub_path, peer_id)
.map_err(|e| CellError::HubOpen(format!("{:?}", e)))?;
peer.register();
let doorbell = Doorbell::from_raw_fd(doorbell_fd)
.map_err(|e| CellError::DoorbellFd(format!("{:?}", e)))?;
let transport = Transport::Shm(ShmTransport::hub_peer(
Arc::new(peer),
doorbell,
cell_name_guess(),
));
let session = Arc::new(RpcSession::with_channel_start(
transport,
CELL_CHANNEL_START,
));
Ok(CellSetup {
session,
path: hub_path,
peer_id: Some(peer_id),
})
}
}
}
pub async fn run<S>(service: S) -> Result<(), CellError>
where
S: ServiceDispatch,
{
run_with_config(service, DEFAULT_SHM_CONFIG).await
}
pub async fn run_with_config<S>(service: S, config: ShmSessionConfig) -> Result<(), CellError>
where
S: ServiceDispatch,
{
let setup = setup_cell(config).await?;
let session = setup.session;
let peer_id = setup.peer_id;
let cell_name = cell_name_guess();
let (tracing_filter, tracing_service) = tracing_setup::create_tracing_config_service();
session.set_dispatcher(
DispatcherBuilder::new()
.add_service(tracing_service)
.add_service(service)
.build(),
);
let run_task = {
let session = session.clone();
tokio::spawn(async move { session.run().await })
};
for _ in 0..10 {
tokio::task::yield_now().await;
}
if let Some(peer_id) = peer_id {
let client = CellLifecycleClient::new(session.clone());
let msg = ReadyMsg {
peer_id,
cell_name: cell_name.clone(),
pid: Some(std::process::id()),
version: None,
features: vec![],
};
if !quiet_mode_enabled() {
eprintln!(
"[rapace-cell] {} (peer_id={}) sending ready signal...",
cell_name, peer_id
);
}
match ready_handshake_with_backoff(&client, msg).await {
Ok(ack) => {
if !quiet_mode_enabled() {
eprintln!(
"[rapace-cell] {} (peer_id={}) ready acknowledged: ok={}",
cell_name, peer_id, ack.ok
);
}
}
Err(e) => {
if !quiet_mode_enabled() {
eprintln!(
"[rapace-cell] {} (peer_id={}) ready FAILED: {:?}",
cell_name, peer_id, e
);
}
}
}
}
tracing_setup::install_tracing_layer(session.clone(), tracing_filter);
tracing::debug!(target: "cell", cell = %cell_name, "Connected to host via SHM: {}", setup.path.display());
match run_task.await {
Ok(result) => result?,
Err(join_err) => {
return Err(CellError::Transport(TransportError::Io(
std::io::Error::other(format!("demux task join error: {join_err}")),
)));
}
}
Ok(())
}
pub async fn run_with_session<F, S>(factory: F) -> Result<(), CellError>
where
F: FnOnce(Arc<RpcSession>) -> S,
S: ServiceDispatch,
{
run_with_session_and_config(factory, DEFAULT_SHM_CONFIG).await
}
pub async fn run_with_session_and_config<F, S>(
factory: F,
config: ShmSessionConfig,
) -> Result<(), CellError>
where
F: FnOnce(Arc<RpcSession>) -> S,
S: ServiceDispatch,
{
let setup = setup_cell(config).await?;
let session = setup.session;
let peer_id = setup.peer_id;
let cell_name = cell_name_guess();
let service = factory(session.clone());
let (tracing_filter, tracing_service) = tracing_setup::create_tracing_config_service();
session.set_dispatcher(
DispatcherBuilder::new()
.add_service(tracing_service)
.add_service(service)
.build(),
);
let run_task = {
let session = session.clone();
tokio::spawn(async move { session.run().await })
};
for _ in 0..10 {
tokio::task::yield_now().await;
}
if let Some(peer_id) = peer_id {
let client = CellLifecycleClient::new(session.clone());
let msg = ReadyMsg {
peer_id,
cell_name: cell_name.clone(),
pid: Some(std::process::id()),
version: None,
features: vec![],
};
if !quiet_mode_enabled() {
eprintln!(
"[rapace-cell] {} (peer_id={}) sending ready signal...",
cell_name, peer_id
);
}
match ready_handshake_with_backoff(&client, msg).await {
Ok(ack) => {
if !quiet_mode_enabled() {
eprintln!(
"[rapace-cell] {} (peer_id={}) ready acknowledged: ok={}",
cell_name, peer_id, ack.ok
);
}
}
Err(e) => {
if !quiet_mode_enabled() {
eprintln!(
"[rapace-cell] {} (peer_id={}) ready FAILED: {:?}",
cell_name, peer_id, e
);
}
}
}
}
tracing_setup::install_tracing_layer(session.clone(), tracing_filter);
tracing::debug!(target: "cell", cell = %cell_name, "Connected to host via SHM: {}", setup.path.display());
match run_task.await {
Ok(result) => result?,
Err(join_err) => {
return Err(CellError::Transport(TransportError::Io(
std::io::Error::other(format!("demux task join error: {join_err}")),
)));
}
}
Ok(())
}
fn ready_total_timeout() -> std::time::Duration {
let timeout_ms = std::env::var("RAPACE_CELL_READY_TIMEOUT_MS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.or_else(|| {
std::env::var("DODECA_CELL_READY_TIMEOUT_MS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
})
.unwrap_or(10_000);
std::time::Duration::from_millis(timeout_ms)
}
async fn ready_handshake_with_backoff(
client: &CellLifecycleClient,
msg: ReadyMsg,
) -> Result<ReadyAck, RpcError> {
let timeout = ready_total_timeout();
let start = std::time::Instant::now();
let mut delay_ms = 10u64;
loop {
match client.ready(msg.clone()).await {
Ok(ack) => return Ok(ack),
Err(e) => {
if start.elapsed() >= timeout {
return Err(e);
}
tracing::debug!(
cell = %msg.cell_name,
peer_id = msg.peer_id,
error = ?e,
delay_ms,
"Ready handshake failed; retrying"
);
}
}
tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(200);
}
}
pub async fn run_multi<F>(builder_fn: F) -> Result<(), CellError>
where
F: FnOnce(DispatcherBuilder) -> DispatcherBuilder,
{
run_multi_with_config(builder_fn, DEFAULT_SHM_CONFIG).await
}
pub async fn run_multi_with_config<F>(
builder_fn: F,
config: ShmSessionConfig,
) -> Result<(), CellError>
where
F: FnOnce(DispatcherBuilder) -> DispatcherBuilder,
{
let setup = setup_cell(config).await?;
let session = setup.session;
let peer_id = setup.peer_id;
let cell_name = cell_name_guess();
let (tracing_filter, tracing_service) = tracing_setup::create_tracing_config_service();
let builder = DispatcherBuilder::new();
let builder = builder_fn(builder);
let builder = builder.add_service(tracing_service);
let dispatcher = builder.build();
session.set_dispatcher(dispatcher);
let run_task = {
let session = session.clone();
tokio::spawn(async move { session.run().await })
};
for _ in 0..10 {
tokio::task::yield_now().await;
}
if let Some(peer_id) = peer_id {
let client = CellLifecycleClient::new(session.clone());
let msg = ReadyMsg {
peer_id,
cell_name: cell_name.clone(),
pid: Some(std::process::id()),
version: None,
features: vec![],
};
if !quiet_mode_enabled() {
eprintln!(
"[rapace-cell] {} (peer_id={}) sending ready signal...",
cell_name, peer_id
);
}
match ready_handshake_with_backoff(&client, msg).await {
Ok(ack) => {
if !quiet_mode_enabled() {
eprintln!(
"[rapace-cell] {} (peer_id={}) ready acknowledged: ok={}",
cell_name, peer_id, ack.ok
);
}
}
Err(e) => {
if !quiet_mode_enabled() {
eprintln!(
"[rapace-cell] {} (peer_id={}) ready FAILED: {:?}",
cell_name, peer_id, e
);
}
}
}
}
tracing_setup::install_tracing_layer(session.clone(), tracing_filter);
tracing::debug!(target: "cell", cell = %cell_name, "Connected to host via SHM: {}", setup.path.display());
match run_task.await {
Ok(result) => result?,
Err(join_err) => {
return Err(CellError::Transport(TransportError::Io(
std::io::Error::other(format!("demux task join error: {join_err}")),
)));
}
}
Ok(())
}
pub trait RpcSessionExt {
fn set_service<S>(&self, service: S)
where
S: ServiceDispatch;
}
impl RpcSessionExt for RpcSession {
fn set_service<S>(&self, service: S)
where
S: ServiceDispatch,
{
let service = Arc::new(service);
let dispatcher = move |request: Frame| {
let service = service.clone();
Box::pin(async move {
let mut response = service
.dispatch(request.desc.method_id, request.payload_bytes())
.await?;
response.desc.channel_id = request.desc.channel_id;
response.desc.msg_id = request.desc.msg_id;
Ok(response)
})
};
self.set_dispatcher(dispatcher);
}
}
#[macro_export]
macro_rules! run_cell {
($service:expr) => {
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
$crate::run($service).await?;
Ok(())
}
};
}
#[macro_export]
macro_rules! run_cell_with_session {
($factory:expr) => {
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
$crate::run_with_session($factory).await?;
Ok(())
}
};
}
#[macro_export]
macro_rules! cell_service {
($server_type:ty, $impl_type:ty) => {
struct CellService(std::sync::Arc<$server_type>);
impl $crate::ServiceDispatch for CellService {
fn dispatch(
&self,
method_id: u32,
payload: &[u8],
) -> std::pin::Pin<
Box<
dyn std::future::Future<
Output = std::result::Result<$crate::Frame, $crate::RpcError>,
> + Send
+ 'static,
>,
> {
let server = self.0.clone();
let bytes = payload.to_vec();
Box::pin(async move { server.dispatch(method_id, &bytes).await })
}
}
impl From<$impl_type> for CellService {
fn from(impl_val: $impl_type) -> Self {
Self(std::sync::Arc::new(<$server_type>::new(impl_val)))
}
}
};
}