#[allow(unused)]
use tracing::{debug, error, info, warn};
use error_stack::{IntoReport, Result, ResultExt};
use serde::{de::DeserializeOwned, Serialize};
use {
super::{InfoCon, InfomgrError},
ipcon_sys::ipcon::{
Ipcon, IPCON_KERNEL_GROUP_NAME, IPCON_KERNEL_NAME, IPF_DISABLE_KEVENT_FILTER, IPF_RCV_IF,
IPF_SND_IF,
},
ipcon_sys::ipcon_msg::{IpconMsg, IpconMsgType},
std::sync::mpsc::{Receiver, Sender},
std::sync::Arc,
std::thread::JoinHandle,
};
#[derive(Default)]
pub struct IpconInfomgr {
handler: Option<JoinHandle<Result<(), InfomgrError>>>,
}
impl InfoCon for IpconInfomgr {
fn sender<T>(&mut self, source: Arc<String>, receiver: Receiver<T>) -> Result<(), InfomgrError>
where
T: 'static + Serialize + Send,
{
let handler = std::thread::spawn(move || -> Result<(), InfomgrError> {
let (group, peer) = source
.split_once('@')
.ok_or(InfomgrError::InvalidData)
.into_report()
.attach_printable(format!("Invalid group {}", source))?;
let ih = Ipcon::new(Some(peer), Some(IPF_SND_IF))
.change_context(InfomgrError::IpconError)?;
ih.register_group(group)
.change_context(InfomgrError::IpconError)?;
loop {
let report = receiver
.recv()
.into_report()
.change_context(InfomgrError::IOError)?;
let buf = serde_json::to_string(&report)
.into_report()
.change_context(InfomgrError::InvalidData)?;
ih.send_multicast(group, buf.as_bytes(), false)
.change_context(InfomgrError::IpconError)?;
}
});
self.handler = Some(handler);
Ok(())
}
fn receiver<T>(
&mut self,
source: Arc<String>,
sender: Sender<Option<T>>,
) -> Result<(), InfomgrError>
where
T: 'static + DeserializeOwned + Send,
{
let handler = std::thread::spawn(move || -> Result<(), InfomgrError> {
let (group, peer) = source
.split_once('@')
.ok_or(InfomgrError::InvalidData)
.into_report()
.attach_printable(format!("Invalid group {}", source))?;
let matched = |p: &str, g: &str| -> bool { p == peer && g == group };
let ih = Ipcon::new(None, Some(IPF_RCV_IF | IPF_DISABLE_KEVENT_FILTER))
.change_context(InfomgrError::IpconError)?;
ih.join_group(IPCON_KERNEL_NAME, IPCON_KERNEL_GROUP_NAME)
.change_context(InfomgrError::IpconError)?;
if ih.join_group(peer, group).is_ok() {
debug!("Infomgr: {}@{} connected", group, peer);
}
loop {
let msg = ih.receive_msg().change_context(InfomgrError::IOError)?;
match msg {
IpconMsg::IpconMsgKevent(kevent) => {
if let Some((p, g)) = kevent.group_added() {
if matched(&p, &g) && ih.join_group(&p, &g).is_ok() {
debug!("Infomgr: {}@{} connected", g, p);
}
continue;
}
if let Some((p, g)) = kevent.group_removed() {
if matched(&p, &g) {
debug!("Infomgr {}@{} lost.", g, p);
sender
.send(None)
.map_err(|_| InfomgrError::IOError)
.into_report()?;
continue;
}
}
}
IpconMsg::IpconMsgUser(m)
if matches!(m.msg_type, IpconMsgType::IpconMsgTypeGroup) =>
{
let group = m.group.unwrap();
if matched(&m.peer, &group) {
if let Ok(s) = std::str::from_utf8(&m.buf) {
if let Ok(ir) = serde_json::from_str(s) {
debug!("Infomgr report received from {}@{}.", group, m.peer);
sender
.send(Some(ir))
.map_err(|_| InfomgrError::IOError)
.into_report()?;
}
}
}
}
_ => {}
}
}
});
self.handler = Some(handler);
Ok(())
}
}