use std::{
any::TypeId,
time::{Duration, SystemTime},
};
use bitcode::{Decode, Encode};
use pipeworks_core::{
bus::Bus,
channel::SerdePayload,
event::BusEvent,
node_id::{Id, NodeId},
reg::{BusHelpers, BusTypeTrace},
};
use pipeworks_derive::{BusLocal, BusShared, register_generic};
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt as _};
#[derive(BusShared, Clone, Debug, Encode, Decode)]
#[bus_ctl(buffer_cap = 2)]
pub struct Normy {
pub foo: i32,
}
#[derive(BusLocal, Clone, Debug)]
pub struct TemplateType<T>
where
T: Clone + Send + Sync + 'static,
{
_value: T,
}
type ConcreteTemplateI32 = TemplateType<i32>;
register_generic!(ConcreteTemplateI32);
#[tokio::main]
pub async fn main() {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().pretty())
.init();
let _implicit_global_bus = Bus::get_default();
Normy::bus_trace();
tokio::spawn(async move {
let mut rx = Bus::get_default().subscribe_serde(&TypeId::of::<Normy>());
while let Ok(event) = rx.recv().await {
info!(
"Serde bytes: {}",
match event.msg {
pipeworks_core::channel::SerdePayload::Bitcode(items) => items.len(),
}
);
}
});
tokio::time::sleep(Duration::from_millis(1)).await;
for i in 0..4 {
Normy { foo: i }.bus_send();
}
let fake_node = NodeId {
node: Id::new(),
machine: Id::new(),
};
Bus::get_default().send_serde_event(
&TypeId::of::<Normy>(),
BusEvent {
time: SystemTime::now(),
source: fake_node.clone(),
msg: SerdePayload::Bitcode(bitcode::encode(&Normy { foo: 99 })),
},
);
tokio::time::sleep(Duration::from_millis(50)).await;
}