1use std::marker::PhantomData;
34use std::sync::mpsc;
35
36use drop_bomb::DropBomb;
37use engate_types::{AttachError, Live, Phase, Snapshot, Spawned, Subscribed, Synced};
38
39pub trait Producer: Send + Sync + 'static {
45 type Item: Send + 'static;
47
48 type Snap: Snapshot;
52
53 fn snapshot(&self) -> Result<Self::Snap, AttachError>;
58
59 fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError>;
63}
64
65pub trait Consumer: Send + 'static {
68 type Item: Send + 'static;
70
71 type Snap: Snapshot;
73
74 fn replay(&mut self, snapshot: Self::Snap);
78
79 fn consume(&mut self, item: Self::Item);
82}
83
84#[must_use = "engate::History must be passed to Attach::replay() — dropping it loses the producer's pre-attach state and reintroduces the bug class engate exists to eliminate"]
95pub struct History<S: Snapshot> {
96 snapshot: S,
97 bomb: DropBomb,
98}
99
100impl<S: Snapshot> History<S> {
101 fn new(snapshot: S) -> Self {
102 Self {
103 snapshot,
104 bomb: DropBomb::new(
105 "engate::History dropped without being consumed — pass it to Attach::replay()",
106 ),
107 }
108 }
109
110 pub fn into_inner(mut self) -> S {
114 self.bomb.defuse();
115 self.snapshot
116 }
117
118 pub fn size_bytes(&self) -> usize {
120 self.snapshot.size_bytes()
121 }
122}
123
124pub struct Attach<P: Phase, Prod: Producer, Cons: Consumer> {
130 producer: Prod,
131 consumer: Cons,
132 rx: Option<mpsc::Receiver<Prod::Item>>,
134 _phase: PhantomData<P>,
135}
136
137#[derive(typed_builder::TypedBuilder)]
143#[builder(build_method(into = AttachSpawned<Prod, Cons>))]
144pub struct AttachConfig<Prod, Cons>
145where
146 Prod: Producer,
147 Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
148{
149 pub producer: Prod,
150 pub consumer: Cons,
151}
152
153pub type AttachSpawned<Prod, Cons> = Attach<Spawned, Prod, Cons>;
155
156impl<Prod, Cons> From<AttachConfig<Prod, Cons>> for AttachSpawned<Prod, Cons>
157where
158 Prod: Producer,
159 Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
160{
161 fn from(c: AttachConfig<Prod, Cons>) -> Self {
162 Attach {
163 producer: c.producer,
164 consumer: c.consumer,
165 rx: None,
166 _phase: PhantomData,
167 }
168 }
169}
170
171impl<Prod, Cons> Attach<Spawned, Prod, Cons>
172where
173 Prod: Producer,
174 Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
175{
176 pub fn builder() -> AttachConfigBuilder<Prod, Cons, ((), ())> {
178 AttachConfig::builder()
179 }
180}
181
182impl<Prod, Cons> Attach<Spawned, Prod, Cons>
185where
186 Prod: Producer,
187 Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
188{
189 pub fn subscribe(
195 self,
196 ) -> Result<(Attach<Subscribed, Prod, Cons>, History<Prod::Snap>), AttachError> {
197 let rx = self.producer.subscribe()?;
198 let snap = self.producer.snapshot()?;
199 tracing::debug!(
200 target: "engate::attach",
201 snapshot_bytes = snap.size_bytes(),
202 "subscribe complete — snapshot captured"
203 );
204 let history = History::new(snap);
205 let next = Attach {
206 producer: self.producer,
207 consumer: self.consumer,
208 rx: Some(rx),
209 _phase: PhantomData,
210 };
211 Ok((next, history))
212 }
213}
214
215impl<Prod, Cons> Attach<Subscribed, Prod, Cons>
216where
217 Prod: Producer,
218 Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
219{
220 pub fn replay(
224 self,
225 history: History<Prod::Snap>,
226 ) -> Result<Attach<Synced, Prod, Cons>, AttachError> {
227 let snap = history.into_inner();
228 let bytes = snap.size_bytes();
229 let mut consumer = self.consumer;
230 consumer.replay(snap);
231 tracing::debug!(target: "engate::attach", bytes, "replay complete");
232 Ok(Attach {
233 producer: self.producer,
234 consumer,
235 rx: self.rx,
236 _phase: PhantomData,
237 })
238 }
239}
240
241impl<Prod, Cons> Attach<Synced, Prod, Cons>
242where
243 Prod: Producer,
244 Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
245{
246 pub fn start_live(self) -> Attach<Live, Prod, Cons> {
250 tracing::debug!(target: "engate::attach", "starting live stream");
251 Attach {
252 producer: self.producer,
253 consumer: self.consumer,
254 rx: self.rx,
255 _phase: PhantomData,
256 }
257 }
258}
259
260impl<Prod, Cons> Attach<Live, Prod, Cons>
261where
262 Prod: Producer,
263 Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
264{
265 pub fn run(mut self) -> Cons {
270 let rx = self.rx.take().expect("Attach<Live> always has rx");
271 while let Ok(item) = rx.recv() {
272 self.consumer.consume(item);
273 }
274 self.consumer
275 }
276
277 pub fn poll_one(&mut self) -> bool {
282 let rx = self.rx.as_ref().expect("Attach<Live> always has rx");
283 match rx.try_recv() {
284 Ok(item) => {
285 self.consumer.consume(item);
286 true
287 }
288 Err(_) => false,
289 }
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296 use std::sync::Arc;
297 use std::sync::Mutex;
298
299 struct VecProducer {
302 snapshot_data: Vec<u8>,
303 live_data: Mutex<Vec<u8>>,
304 tx: Mutex<Option<mpsc::Sender<u8>>>,
305 }
306
307 impl VecProducer {
308 fn new(snapshot: Vec<u8>, live: Vec<u8>) -> Self {
309 Self {
310 snapshot_data: snapshot,
311 live_data: Mutex::new(live),
312 tx: Mutex::new(None),
313 }
314 }
315
316 fn flush_live_and_close(&self) {
319 let tx_opt = self.tx.lock().unwrap().take();
322 if let Some(tx) = tx_opt {
323 for b in self.live_data.lock().unwrap().drain(..) {
324 let _ = tx.send(b);
325 }
326 drop(tx);
327 }
328 }
329 }
330
331 impl Producer for VecProducer {
332 type Item = u8;
333 type Snap = Vec<u8>;
334
335 fn snapshot(&self) -> Result<Self::Snap, AttachError> {
336 Ok(self.snapshot_data.clone())
337 }
338
339 fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError> {
340 let (tx, rx) = mpsc::channel();
341 *self.tx.lock().unwrap() = Some(tx);
342 Ok(rx)
343 }
344 }
345
346 #[derive(Default)]
347 struct VecConsumer(Arc<Mutex<Vec<u8>>>);
348
349 impl Consumer for VecConsumer {
350 type Item = u8;
351 type Snap = Vec<u8>;
352
353 fn replay(&mut self, snapshot: Self::Snap) {
354 self.0.lock().unwrap().extend(snapshot);
355 }
356
357 fn consume(&mut self, item: Self::Item) {
358 self.0.lock().unwrap().push(item);
359 }
360 }
361
362 #[test]
363 fn full_lifecycle_observes_snapshot_then_live() {
364 let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
365 let prod = VecProducer::new(vec![1, 2, 3], vec![4, 5, 6]);
366 let cons = VecConsumer(observed.clone());
367
368 let attach = Attach::builder().producer(prod).consumer(cons).build();
369 let (attach, history) = attach.subscribe().unwrap();
370 let attach = attach.replay(history).unwrap();
371 attach.producer.flush_live_and_close();
375 let attach = attach.start_live();
376 let _cons = attach.run();
377
378 assert_eq!(*observed.lock().unwrap(), vec![1, 2, 3, 4, 5, 6]);
379 }
380
381 #[test]
382 #[should_panic(expected = "History dropped without being consumed")]
383 fn dropping_history_panics_via_dropbomb() {
384 let prod = VecProducer::new(vec![], vec![]);
385 let cons = VecConsumer::default();
386 let attach = Attach::builder().producer(prod).consumer(cons).build();
387 let (_attach, _history) = attach.subscribe().unwrap();
388 }
390
391 #[test]
392 fn history_size_bytes_reflects_snapshot() {
393 let prod = VecProducer::new(vec![1, 2, 3, 4, 5], vec![]);
394 let cons = VecConsumer::default();
395 let attach = Attach::builder().producer(prod).consumer(cons).build();
396 let (attach, history) = attach.subscribe().unwrap();
397 assert_eq!(history.size_bytes(), 5);
398 let _ = attach.replay(history);
399 }
400
401 #[test]
402 fn poll_one_returns_false_on_empty_channel() {
403 let prod = VecProducer::new(vec![], vec![]);
404 let cons = VecConsumer::default();
405 let attach = Attach::builder().producer(prod).consumer(cons).build();
406 let (attach, history) = attach.subscribe().unwrap();
407 let attach = attach.replay(history).unwrap();
408 let mut attach = attach.start_live();
409 assert!(!attach.poll_one());
410 }
411
412 struct SnapshotFailingProducer;
418
419 impl Producer for SnapshotFailingProducer {
420 type Item = u8;
421 type Snap = Vec<u8>;
422 fn snapshot(&self) -> Result<Self::Snap, AttachError> {
423 Err(AttachError::SnapshotFailed("disk full".into()))
424 }
425 fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError> {
426 let (_tx, rx) = mpsc::channel();
427 Ok(rx)
428 }
429 }
430
431 #[test]
432 fn snapshot_failure_propagates_through_subscribe() {
433 let prod = SnapshotFailingProducer;
434 let cons = VecConsumer::default();
435 let attach = Attach::builder().producer(prod).consumer(cons).build();
436 let err = attach.subscribe().err().expect("snapshot must fail");
437 assert!(matches!(err, AttachError::SnapshotFailed(_)));
438 assert!(err.to_string().contains("disk full"));
439 }
440
441 struct SubscribeFailingProducer {
445 snapshot_called: Mutex<bool>,
446 }
447
448 impl Producer for SubscribeFailingProducer {
449 type Item = u8;
450 type Snap = Vec<u8>;
451 fn snapshot(&self) -> Result<Self::Snap, AttachError> {
452 *self.snapshot_called.lock().unwrap() = true;
453 Ok(vec![])
454 }
455 fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError> {
456 Err(AttachError::SubscribeFailed("permission denied".into()))
457 }
458 }
459
460 #[test]
461 fn subscribe_failure_propagates_without_snapshotting() {
462 let prod = SubscribeFailingProducer {
463 snapshot_called: Mutex::new(false),
464 };
465 let cons = VecConsumer::default();
466 let attach = Attach::builder().producer(prod).consumer(cons).build();
467 let err = attach.subscribe().err().expect("subscribe must fail");
468 assert!(matches!(err, AttachError::SubscribeFailed(_)));
469 }
473
474 #[test]
478 fn attach_error_display_for_every_variant() {
479 let errors = vec![
480 AttachError::SnapshotFailed("a".into()),
481 AttachError::SubscribeFailed("b".into()),
482 AttachError::NoSuchEntity("c".into()),
483 AttachError::Transport("d".into()),
484 ];
485 for e in errors {
486 assert!(!e.to_string().is_empty());
488 }
489 }
490
491 #[test]
496 fn history_into_inner_defuses_bomb() {
497 let prod = VecProducer::new(vec![9, 9, 9], vec![]);
498 let cons = VecConsumer::default();
499 let attach = Attach::builder().producer(prod).consumer(cons).build();
500 let (_attach, history) = attach.subscribe().unwrap();
501 let snap = history.into_inner();
503 assert_eq!(snap, vec![9, 9, 9]);
504 }
507
508 #[test]
511 fn history_size_bytes_zero_for_empty_snapshot() {
512 let prod = VecProducer::new(vec![], vec![]);
513 let cons = VecConsumer::default();
514 let attach = Attach::builder().producer(prod).consumer(cons).build();
515 let (attach, history) = attach.subscribe().unwrap();
516 assert_eq!(history.size_bytes(), 0);
517 let _ = attach.replay(history);
518 }
519
520 #[test]
526 fn poll_one_drains_exactly_one_then_empty() {
527 let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
528 let prod = VecProducer::new(vec![], vec![1, 2]);
529 let cons = VecConsumer(observed.clone());
530 let attach = Attach::builder().producer(prod).consumer(cons).build();
531 let (attach, history) = attach.subscribe().unwrap();
532 let attach = attach.replay(history).unwrap();
533 attach.producer.flush_live_and_close();
534 let mut attach = attach.start_live();
535 assert!(attach.poll_one(), "first poll should pick up an item");
537 assert_eq!(*observed.lock().unwrap(), vec![1]);
538 assert!(attach.poll_one(), "second poll should pick up an item");
540 assert_eq!(*observed.lock().unwrap(), vec![1, 2]);
541 assert!(!attach.poll_one(), "third poll should return false");
543 }
544
545 #[test]
551 fn replay_fires_exactly_once_with_correct_payload() {
552 let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
553 let snap_bytes: Vec<u8> = (0..=255).collect();
554 let prod = VecProducer::new(snap_bytes.clone(), vec![]);
555 let cons = VecConsumer(observed.clone());
556 let attach = Attach::builder().producer(prod).consumer(cons).build();
557 let (attach, history) = attach.subscribe().unwrap();
558 let _attach = attach.replay(history).unwrap();
559 assert_eq!(*observed.lock().unwrap(), snap_bytes);
560 }
561
562 #[test]
566 fn run_returns_consumer_with_full_observation() {
567 let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
568 let prod = VecProducer::new(vec![10, 20], vec![30, 40]);
569 let cons = VecConsumer(observed.clone());
570 let attach = Attach::builder().producer(prod).consumer(cons).build();
571 let (attach, history) = attach.subscribe().unwrap();
572 let attach = attach.replay(history).unwrap();
573 attach.producer.flush_live_and_close();
574 let returned_consumer = attach.start_live().run();
575 assert_eq!(*returned_consumer.0.lock().unwrap(), vec![10, 20, 30, 40]);
577 }
578
579 #[test]
582 fn phase_names_visible_from_attach_crate() {
583 assert_eq!(<Spawned as Phase>::name(), "Spawned");
584 assert_eq!(<Subscribed as Phase>::name(), "Subscribed");
585 assert_eq!(<Synced as Phase>::name(), "Synced");
586 assert_eq!(<Live as Phase>::name(), "Live");
587 }
588}