use std::marker::PhantomData;
use std::sync::mpsc;
use drop_bomb::DropBomb;
use engate_types::{AttachError, Live, Phase, Snapshot, Spawned, Subscribed, Synced};
pub trait Producer: Send + Sync + 'static {
type Item: Send + 'static;
type Snap: Snapshot;
fn snapshot(&self) -> Result<Self::Snap, AttachError>;
fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError>;
}
pub trait Consumer: Send + 'static {
type Item: Send + 'static;
type Snap: Snapshot;
fn replay(&mut self, snapshot: Self::Snap);
fn consume(&mut self, item: Self::Item);
}
#[must_use = "engate::History must be passed to Attach::replay() — dropping it loses the producer's pre-attach state and reintroduces the bug class engate exists to eliminate"]
pub struct History<S: Snapshot> {
snapshot: S,
bomb: DropBomb,
}
impl<S: Snapshot> History<S> {
fn new(snapshot: S) -> Self {
Self {
snapshot,
bomb: DropBomb::new(
"engate::History dropped without being consumed — pass it to Attach::replay()",
),
}
}
pub fn into_inner(mut self) -> S {
self.bomb.defuse();
self.snapshot
}
pub fn size_bytes(&self) -> usize {
self.snapshot.size_bytes()
}
}
pub struct Attach<P: Phase, Prod: Producer, Cons: Consumer> {
producer: Prod,
consumer: Cons,
rx: Option<mpsc::Receiver<Prod::Item>>,
_phase: PhantomData<P>,
}
#[derive(typed_builder::TypedBuilder)]
#[builder(build_method(into = AttachSpawned<Prod, Cons>))]
pub struct AttachConfig<Prod, Cons>
where
Prod: Producer,
Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
{
pub producer: Prod,
pub consumer: Cons,
}
pub type AttachSpawned<Prod, Cons> = Attach<Spawned, Prod, Cons>;
impl<Prod, Cons> From<AttachConfig<Prod, Cons>> for AttachSpawned<Prod, Cons>
where
Prod: Producer,
Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
{
fn from(c: AttachConfig<Prod, Cons>) -> Self {
Attach {
producer: c.producer,
consumer: c.consumer,
rx: None,
_phase: PhantomData,
}
}
}
impl<Prod, Cons> Attach<Spawned, Prod, Cons>
where
Prod: Producer,
Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
{
pub fn builder() -> AttachConfigBuilder<Prod, Cons, ((), ())> {
AttachConfig::builder()
}
}
impl<Prod, Cons> Attach<Spawned, Prod, Cons>
where
Prod: Producer,
Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
{
pub fn subscribe(
self,
) -> Result<(Attach<Subscribed, Prod, Cons>, History<Prod::Snap>), AttachError> {
let rx = self.producer.subscribe()?;
let snap = self.producer.snapshot()?;
tracing::debug!(
target: "engate::attach",
snapshot_bytes = snap.size_bytes(),
"subscribe complete — snapshot captured"
);
let history = History::new(snap);
let next = Attach {
producer: self.producer,
consumer: self.consumer,
rx: Some(rx),
_phase: PhantomData,
};
Ok((next, history))
}
}
impl<Prod, Cons> Attach<Subscribed, Prod, Cons>
where
Prod: Producer,
Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
{
pub fn replay(
self,
history: History<Prod::Snap>,
) -> Result<Attach<Synced, Prod, Cons>, AttachError> {
let snap = history.into_inner();
let bytes = snap.size_bytes();
let mut consumer = self.consumer;
consumer.replay(snap);
tracing::debug!(target: "engate::attach", bytes, "replay complete");
Ok(Attach {
producer: self.producer,
consumer,
rx: self.rx,
_phase: PhantomData,
})
}
}
impl<Prod, Cons> Attach<Synced, Prod, Cons>
where
Prod: Producer,
Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
{
pub fn start_live(self) -> Attach<Live, Prod, Cons> {
tracing::debug!(target: "engate::attach", "starting live stream");
Attach {
producer: self.producer,
consumer: self.consumer,
rx: self.rx,
_phase: PhantomData,
}
}
}
impl<Prod, Cons> Attach<Live, Prod, Cons>
where
Prod: Producer,
Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
{
pub fn run(mut self) -> Cons {
let rx = self.rx.take().expect("Attach<Live> always has rx");
while let Ok(item) = rx.recv() {
self.consumer.consume(item);
}
self.consumer
}
pub fn poll_one(&mut self) -> bool {
let rx = self.rx.as_ref().expect("Attach<Live> always has rx");
match rx.try_recv() {
Ok(item) => {
self.consumer.consume(item);
true
}
Err(_) => false,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::sync::Mutex;
struct VecProducer {
snapshot_data: Vec<u8>,
live_data: Mutex<Vec<u8>>,
tx: Mutex<Option<mpsc::Sender<u8>>>,
}
impl VecProducer {
fn new(snapshot: Vec<u8>, live: Vec<u8>) -> Self {
Self {
snapshot_data: snapshot,
live_data: Mutex::new(live),
tx: Mutex::new(None),
}
}
fn flush_live_and_close(&self) {
let tx_opt = self.tx.lock().unwrap().take();
if let Some(tx) = tx_opt {
for b in self.live_data.lock().unwrap().drain(..) {
let _ = tx.send(b);
}
drop(tx);
}
}
}
impl Producer for VecProducer {
type Item = u8;
type Snap = Vec<u8>;
fn snapshot(&self) -> Result<Self::Snap, AttachError> {
Ok(self.snapshot_data.clone())
}
fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError> {
let (tx, rx) = mpsc::channel();
*self.tx.lock().unwrap() = Some(tx);
Ok(rx)
}
}
#[derive(Default)]
struct VecConsumer(Arc<Mutex<Vec<u8>>>);
impl Consumer for VecConsumer {
type Item = u8;
type Snap = Vec<u8>;
fn replay(&mut self, snapshot: Self::Snap) {
self.0.lock().unwrap().extend(snapshot);
}
fn consume(&mut self, item: Self::Item) {
self.0.lock().unwrap().push(item);
}
}
#[test]
fn full_lifecycle_observes_snapshot_then_live() {
let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
let prod = VecProducer::new(vec![1, 2, 3], vec![4, 5, 6]);
let cons = VecConsumer(observed.clone());
let attach = Attach::builder().producer(prod).consumer(cons).build();
let (attach, history) = attach.subscribe().unwrap();
let attach = attach.replay(history).unwrap();
attach.producer.flush_live_and_close();
let attach = attach.start_live();
let _cons = attach.run();
assert_eq!(*observed.lock().unwrap(), vec![1, 2, 3, 4, 5, 6]);
}
#[test]
#[should_panic(expected = "History dropped without being consumed")]
fn dropping_history_panics_via_dropbomb() {
let prod = VecProducer::new(vec![], vec![]);
let cons = VecConsumer::default();
let attach = Attach::builder().producer(prod).consumer(cons).build();
let (_attach, _history) = attach.subscribe().unwrap();
}
#[test]
fn history_size_bytes_reflects_snapshot() {
let prod = VecProducer::new(vec![1, 2, 3, 4, 5], vec![]);
let cons = VecConsumer::default();
let attach = Attach::builder().producer(prod).consumer(cons).build();
let (attach, history) = attach.subscribe().unwrap();
assert_eq!(history.size_bytes(), 5);
let _ = attach.replay(history);
}
#[test]
fn poll_one_returns_false_on_empty_channel() {
let prod = VecProducer::new(vec![], vec![]);
let cons = VecConsumer::default();
let attach = Attach::builder().producer(prod).consumer(cons).build();
let (attach, history) = attach.subscribe().unwrap();
let attach = attach.replay(history).unwrap();
let mut attach = attach.start_live();
assert!(!attach.poll_one());
}
struct SnapshotFailingProducer;
impl Producer for SnapshotFailingProducer {
type Item = u8;
type Snap = Vec<u8>;
fn snapshot(&self) -> Result<Self::Snap, AttachError> {
Err(AttachError::SnapshotFailed("disk full".into()))
}
fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError> {
let (_tx, rx) = mpsc::channel();
Ok(rx)
}
}
#[test]
fn snapshot_failure_propagates_through_subscribe() {
let prod = SnapshotFailingProducer;
let cons = VecConsumer::default();
let attach = Attach::builder().producer(prod).consumer(cons).build();
let err = attach.subscribe().err().expect("snapshot must fail");
assert!(matches!(err, AttachError::SnapshotFailed(_)));
assert!(err.to_string().contains("disk full"));
}
struct SubscribeFailingProducer {
snapshot_called: Mutex<bool>,
}
impl Producer for SubscribeFailingProducer {
type Item = u8;
type Snap = Vec<u8>;
fn snapshot(&self) -> Result<Self::Snap, AttachError> {
*self.snapshot_called.lock().unwrap() = true;
Ok(vec![])
}
fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError> {
Err(AttachError::SubscribeFailed("permission denied".into()))
}
}
#[test]
fn subscribe_failure_propagates_without_snapshotting() {
let prod = SubscribeFailingProducer {
snapshot_called: Mutex::new(false),
};
let cons = VecConsumer::default();
let attach = Attach::builder().producer(prod).consumer(cons).build();
let err = attach.subscribe().err().expect("subscribe must fail");
assert!(matches!(err, AttachError::SubscribeFailed(_)));
}
#[test]
fn attach_error_display_for_every_variant() {
let errors = vec![
AttachError::SnapshotFailed("a".into()),
AttachError::SubscribeFailed("b".into()),
AttachError::NoSuchEntity("c".into()),
AttachError::Transport("d".into()),
];
for e in errors {
assert!(!e.to_string().is_empty());
}
}
#[test]
fn history_into_inner_defuses_bomb() {
let prod = VecProducer::new(vec![9, 9, 9], vec![]);
let cons = VecConsumer::default();
let attach = Attach::builder().producer(prod).consumer(cons).build();
let (_attach, history) = attach.subscribe().unwrap();
let snap = history.into_inner();
assert_eq!(snap, vec![9, 9, 9]);
}
#[test]
fn history_size_bytes_zero_for_empty_snapshot() {
let prod = VecProducer::new(vec![], vec![]);
let cons = VecConsumer::default();
let attach = Attach::builder().producer(prod).consumer(cons).build();
let (attach, history) = attach.subscribe().unwrap();
assert_eq!(history.size_bytes(), 0);
let _ = attach.replay(history);
}
#[test]
fn poll_one_drains_exactly_one_then_empty() {
let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
let prod = VecProducer::new(vec![], vec![1, 2]);
let cons = VecConsumer(observed.clone());
let attach = Attach::builder().producer(prod).consumer(cons).build();
let (attach, history) = attach.subscribe().unwrap();
let attach = attach.replay(history).unwrap();
attach.producer.flush_live_and_close();
let mut attach = attach.start_live();
assert!(attach.poll_one(), "first poll should pick up an item");
assert_eq!(*observed.lock().unwrap(), vec![1]);
assert!(attach.poll_one(), "second poll should pick up an item");
assert_eq!(*observed.lock().unwrap(), vec![1, 2]);
assert!(!attach.poll_one(), "third poll should return false");
}
#[test]
fn replay_fires_exactly_once_with_correct_payload() {
let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
let snap_bytes: Vec<u8> = (0..=255).collect();
let prod = VecProducer::new(snap_bytes.clone(), vec![]);
let cons = VecConsumer(observed.clone());
let attach = Attach::builder().producer(prod).consumer(cons).build();
let (attach, history) = attach.subscribe().unwrap();
let _attach = attach.replay(history).unwrap();
assert_eq!(*observed.lock().unwrap(), snap_bytes);
}
#[test]
fn run_returns_consumer_with_full_observation() {
let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
let prod = VecProducer::new(vec![10, 20], vec![30, 40]);
let cons = VecConsumer(observed.clone());
let attach = Attach::builder().producer(prod).consumer(cons).build();
let (attach, history) = attach.subscribe().unwrap();
let attach = attach.replay(history).unwrap();
attach.producer.flush_live_and_close();
let returned_consumer = attach.start_live().run();
assert_eq!(*returned_consumer.0.lock().unwrap(), vec![10, 20, 30, 40]);
}
#[test]
fn phase_names_visible_from_attach_crate() {
assert_eq!(<Spawned as Phase>::name(), "Spawned");
assert_eq!(<Subscribed as Phase>::name(), "Subscribed");
assert_eq!(<Synced as Phase>::name(), "Synced");
assert_eq!(<Live as Phase>::name(), "Live");
}
}