hsipc 0.1.3

High-performance inter-process communication framework in Rust
Documentation
//! Real IPMB-based transport for cross-process communication

use crate::transport::Transport;
use crate::{Error, Message, Result};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
use type_uuid::TypeUuid;

/// IPMB message wrapper for our Message type
#[derive(Debug, Serialize, Deserialize, TypeUuid)]
#[uuid = "7b07473e-9659-4d47-a502-8245d71c0078"]
struct IpmbMessage {
    inner: Message,
}

/// IPMB-based transport implementation
pub struct IpmbTransport {
    sender: ipmb::EndpointSender<IpmbMessage>,
    receiver: Arc<Mutex<ipmb::EndpointReceiver<IpmbMessage>>>,
    process_name: String,
}

impl IpmbTransport {
    /// Create a new IPMB transport
    pub async fn new(process_name: &str) -> Result<Self> {
        // Create IPMB options
        let options = ipmb::Options::new("com.hsipc.bus", ipmb::label!(process_name), "");

        // Join the IPMB bus
        let (sender, receiver) = ipmb::join::<IpmbMessage, IpmbMessage>(options, None)
            .map_err(|e| Error::transport_msg(format!("IPMB join failed: {e}")))?;

        tracing::info!("🚌 Joined IPMB bus as process: {}", process_name);

        Ok(Self {
            sender,
            receiver: Arc::new(Mutex::new(receiver)),
            process_name: process_name.to_string(),
        })
    }
}

#[async_trait]
impl Transport for IpmbTransport {
    async fn send(&self, msg: Message) -> Result<()> {
        let ipmb_msg = IpmbMessage { inner: msg.clone() };

        let selector = if let Some(ref target) = msg.target {
            // Send to specific process using label match
            ipmb::Selector::unicast(ipmb::LabelOp::from(target.as_str()))
        } else {
            // Broadcast to all processes - use multicast with True to match all endpoints
            ipmb::Selector::multicast(ipmb::LabelOp::True)
        };

        let ipmb_message = ipmb::Message::new(selector, ipmb_msg);

        // Try to send with retry on certain errors
        match self.sender.send(ipmb_message) {
            Ok(()) => {
                tracing::debug!("📤 Sent IPMB message to {:?}", msg.target);
                Ok(())
            }
            Err(e) => {
                // Log the error but don't fail completely for "Invalid argument" errors
                // as they seem to be related to IPMB internal socket handling
                let error_msg = format!("IPMB send failed: {e}");
                if error_msg.contains("Invalid argument") {
                    tracing::warn!("🚨 IPMB send warning (non-fatal): {}", error_msg);
                    Ok(()) // Treat as non-fatal for now
                } else {
                    Err(Error::transport_msg(error_msg))
                }
            }
        }
    }

    async fn recv(&self) -> Result<Message> {
        let receiver = self.receiver.clone();

        tokio::task::spawn_blocking(move || {
            let mut receiver = tokio::runtime::Handle::current().block_on(receiver.lock());
            // Use a reasonable timeout to prevent indefinite blocking
            let timeout = std::time::Duration::from_secs(30);
            match receiver.recv(Some(timeout)) {
                Ok(ipmb_msg) => Ok(ipmb_msg.payload.inner),
                Err(e) => Err(Error::transport_msg(format!("IPMB recv failed: {e}"))),
            }
        })
        .await
        .map_err(|e| Error::runtime("async recv failed", e))?
    }

    async fn close(&self) -> Result<()> {
        // IPMB handles cleanup automatically
        tracing::info!("🚌 Closing IPMB transport for {}", self.process_name);
        Ok(())
    }
}