use crate::transport::Transport;
use crate::{Error, Message, Result};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::Mutex;
use type_uuid::TypeUuid;
#[derive(Debug, Serialize, Deserialize, TypeUuid)]
#[uuid = "7b07473e-9659-4d47-a502-8245d71c0078"]
struct IpmbMessage {
inner: Message,
}
pub struct IpmbTransport {
sender: ipmb::EndpointSender<IpmbMessage>,
receiver: Arc<Mutex<ipmb::EndpointReceiver<IpmbMessage>>>,
process_name: String,
}
impl IpmbTransport {
pub async fn new(process_name: &str) -> Result<Self> {
let options = ipmb::Options::new("com.hsipc.bus", ipmb::label!(process_name), "");
let (sender, receiver) = ipmb::join::<IpmbMessage, IpmbMessage>(options, None)
.map_err(|e| Error::transport_msg(format!("IPMB join failed: {e}")))?;
tracing::info!("🚌 Joined IPMB bus as process: {}", process_name);
Ok(Self {
sender,
receiver: Arc::new(Mutex::new(receiver)),
process_name: process_name.to_string(),
})
}
}
#[async_trait]
impl Transport for IpmbTransport {
async fn send(&self, msg: Message) -> Result<()> {
let ipmb_msg = IpmbMessage { inner: msg.clone() };
let selector = if let Some(ref target) = msg.target {
ipmb::Selector::unicast(ipmb::LabelOp::from(target.as_str()))
} else {
ipmb::Selector::multicast(ipmb::LabelOp::True)
};
let ipmb_message = ipmb::Message::new(selector, ipmb_msg);
match self.sender.send(ipmb_message) {
Ok(()) => {
tracing::debug!("📤 Sent IPMB message to {:?}", msg.target);
Ok(())
}
Err(e) => {
let error_msg = format!("IPMB send failed: {e}");
if error_msg.contains("Invalid argument") {
tracing::warn!("🚨 IPMB send warning (non-fatal): {}", error_msg);
Ok(()) } else {
Err(Error::transport_msg(error_msg))
}
}
}
}
async fn recv(&self) -> Result<Message> {
let receiver = self.receiver.clone();
tokio::task::spawn_blocking(move || {
let mut receiver = tokio::runtime::Handle::current().block_on(receiver.lock());
let timeout = std::time::Duration::from_secs(30);
match receiver.recv(Some(timeout)) {
Ok(ipmb_msg) => Ok(ipmb_msg.payload.inner),
Err(e) => Err(Error::transport_msg(format!("IPMB recv failed: {e}"))),
}
})
.await
.map_err(|e| Error::runtime("async recv failed", e))?
}
async fn close(&self) -> Result<()> {
tracing::info!("🚌 Closing IPMB transport for {}", self.process_name);
Ok(())
}
}