pipeworks 0.1.0

Robust data processing pipelines
use std::{
    any::TypeId,
    time::{Duration, SystemTime},
};

use bitcode::{Decode, Encode};
use pipeworks_core::{
    bus::Bus,
    channel::SerdePayload,
    event::BusEvent,
    node_id::{Id, NodeId},
    reg::{BusHelpers, BusTypeTrace},
};
use pipeworks_derive::{BusLocal, BusShared, register_generic};
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt as _};

// Register a normy struct as an item that can be sent to the bus, and can be serialized and shared
// with remote nodes. We also set the buffer-cap to 2 (from the default of 32). This is the number
// of internal events that will be buffered in the bus before they are dropped and never delivered
// to the receiver. That is to say, we intentionally 'lag' instead of applying back-pressure. You
// can receive events when something lags and chose what to do with it.
#[derive(BusShared, Clone, Debug, Encode, Decode)]
#[bus_ctl(buffer_cap = 2)]
pub struct Normy {
    pub foo: i32,
}

// Register a generic struct that can be sent to the bus but does not implement Encode/Decode so it
// cannot be serialized and shared with remote nodes (either direction).
#[derive(BusLocal, Clone, Debug)]
pub struct TemplateType<T>
where
    T: Clone + Send + Sync + 'static,
{
    _value: T,
}

// The only catch is that each generic specialization you actually want to use needs to be manually
// registered.
type ConcreteTemplateI32 = TemplateType<i32>;
register_generic!(ConcreteTemplateI32);

#[tokio::main]
pub async fn main() {
    tracing_subscriber::registry()
        .with(tracing_subscriber::fmt::layer().pretty())
        .init();

    // There is an implicit global bus (because most applications will only ever need one Bus)
    let _implicit_global_bus = Bus::get_default();

    // Because `Normy` derives `Debug` you can have the bus puke all events of that type out to
    // tracing. Very handy.
    Normy::bus_trace();

    // Spawn a task that received pre-serialized (and type erased) copies of all `Normy` events we
    // shove into the bus. You wouldn't acutely so this, it's used by `pipeworks_net` to shuffle
    // data over the network.
    tokio::spawn(async move {
        let mut rx = Bus::get_default().subscribe_serde(&TypeId::of::<Normy>());
        while let Ok(event) = rx.recv().await {
            info!(
                "Serde bytes: {}",
                match event.msg {
                    pipeworks_core::channel::SerdePayload::Bitcode(items) => items.len(),
                }
            );
        }
    });

    // Messages with no listener are dropped, so give the task a moment to start.
    tokio::time::sleep(Duration::from_millis(1)).await;

    // Send 4 messages all at once. This will overflow our cap of 2, resulting in 2 'lagged'
    // messages.
    for i in 0..4 {
        Normy { foo: i }.bus_send();
    }

    // Synthesize an event from a remote (make a fake node id, and send type-erased serialized data
    // "from" it).
    let fake_node = NodeId {
        node: Id::new(),
        machine: Id::new(),
    };
    Bus::get_default().send_serde_event(
        &TypeId::of::<Normy>(),
        BusEvent {
            time: SystemTime::now(),
            source: fake_node.clone(),
            msg: SerdePayload::Bitcode(bitcode::encode(&Normy { foo: 99 })),
        },
    );

    tokio::time::sleep(Duration::from_millis(50)).await;
}