Skip to main content

engate_attach/
lib.rs

1//! engate-attach — typed attach lifecycle.
2//!
3//! See `engate-types` for the philosophy + phase markers. This crate
4//! provides the runtime machinery: `Producer` / `Consumer` traits,
5//! the typestate-enforced `Attach<P>` value, the `AttachBuilder`, and
6//! the linear-ish `History<S>` handle.
7//!
8//! # Quick example
9//!
10//! ```ignore
11//! use engate_attach::{Attach, Producer, Consumer};
12//!
13//! let attach = Attach::builder()
14//!     .producer(my_producer)
15//!     .consumer(my_consumer)
16//!     .build()                   // -> Attach<Spawned>
17//!     .subscribe()?              // -> Attach<Subscribed>, also returns History<S>
18//!     .replay(history)?          // -> Attach<Synced>
19//!     .start_live();             // -> Attach<Live>
20//!
21//! // Only an Attach<Live> can render.
22//! attach.run();
23//! ```
24//!
25//! # Why typestate over runtime FSM
26//!
27//! A `Result` return + match on phase string would also work, but the
28//! compiler would let you write `attach.replay()` on a `Subscribed`
29//! that you forgot to `subscribe()` first. Typestate makes the
30//! malformed call a compile error: `Spawned` has no `replay` method;
31//! `Subscribed` has no `start_live` method. You CAN'T write the bug.
32
33use std::marker::PhantomData;
34use std::sync::mpsc;
35
36use drop_bomb::DropBomb;
37use engate_types::{AttachError, Live, Phase, Snapshot, Spawned, Subscribed, Synced};
38
39// ── Producer / Consumer traits ──────────────────────────────────────
40
41/// A producer of live data + bootable snapshots. The trait is the
42/// only thing engate needs to provide attach semantics; concrete
43/// producers (tear pane, WS channel, K8s log stream) implement it.
44pub trait Producer: Send + Sync + 'static {
45    /// Items emitted on the live stream.
46    type Item: Send + 'static;
47
48    /// Snapshot representation. Whatever the consumer needs to
49    /// reach the producer's current state without replaying every
50    /// historical item.
51    type Snap: Snapshot;
52
53    /// Capture the producer's current state. Must NOT be racy with
54    /// `subscribe` — the contract is "no item between snapshot
55    /// capture and live-stream registration may be lost"; concrete
56    /// implementors typically subscribe first, then snapshot.
57    fn snapshot(&self) -> Result<Self::Snap, AttachError>;
58
59    /// Register a live subscriber. Returns a receiver; items are
60    /// pushed asynchronously by the producer. Dropping the receiver
61    /// unsubscribes.
62    fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError>;
63}
64
65/// A consumer of producer items + snapshots. Mirrors the producer's
66/// associated types so the type-checker enforces compatible pairs.
67pub trait Consumer: Send + 'static {
68    /// Items received from the producer's live stream.
69    type Item: Send + 'static;
70
71    /// Snapshot representation accepted by `replay`.
72    type Snap: Snapshot;
73
74    /// Bootstrap the consumer's local model from a producer
75    /// snapshot. Called exactly once during the
76    /// `Subscribed → Synced` transition.
77    fn replay(&mut self, snapshot: Self::Snap);
78
79    /// Apply a single live item. Called once per item after
80    /// `start_live` is invoked.
81    fn consume(&mut self, item: Self::Item);
82}
83
84// ── History — linear-ish snapshot handle ────────────────────────────
85
86/// A snapshot in flight from producer to consumer. `#[must_use]` +
87/// runtime drop-bomb together approximate linear typing: forgetting
88/// to consume `History` panics in debug builds, and `clippy` flags
89/// the dropped result in CI.
90///
91/// The only way to consume `History` is to pass it to
92/// `Attach<Subscribed>::replay()`, which moves the inner snapshot
93/// into the consumer and defuses the bomb.
94#[must_use = "engate::History must be passed to Attach::replay() — dropping it loses the producer's pre-attach state and reintroduces the bug class engate exists to eliminate"]
95pub struct History<S: Snapshot> {
96    snapshot: S,
97    bomb: DropBomb,
98}
99
100impl<S: Snapshot> History<S> {
101    fn new(snapshot: S) -> Self {
102        Self {
103            snapshot,
104            bomb: DropBomb::new(
105                "engate::History dropped without being consumed — pass it to Attach::replay()",
106            ),
107        }
108    }
109
110    /// Move out the snapshot, defusing the drop-bomb. Called inside
111    /// `Attach::replay`. Public so external code that needs to
112    /// inspect the snapshot can do so without panicking on drop.
113    pub fn into_inner(mut self) -> S {
114        self.bomb.defuse();
115        self.snapshot
116    }
117
118    /// Approximate byte size — useful for tracing replay cost.
119    pub fn size_bytes(&self) -> usize {
120        self.snapshot.size_bytes()
121    }
122}
123
124// ── Attach typestate ────────────────────────────────────────────────
125
126/// The typed attach handle. `P` is the current phase; the available
127/// methods are gated on `P` so malformed call sequences are compile
128/// errors rather than runtime mismatches.
129pub struct Attach<P: Phase, Prod: Producer, Cons: Consumer> {
130    producer: Prod,
131    consumer: Cons,
132    // Set after `subscribe()`; consumed by `start_live()`.
133    rx: Option<mpsc::Receiver<Prod::Item>>,
134    _phase: PhantomData<P>,
135}
136
137// ── Builder (Spawned constructor) ───────────────────────────────────
138
139/// Builder for the initial `Attach<Spawned>`. `typed-builder` enforces
140/// that both `producer` and `consumer` are set before `.build()` can
141/// be called.
142#[derive(typed_builder::TypedBuilder)]
143#[builder(build_method(into = AttachSpawned<Prod, Cons>))]
144pub struct AttachConfig<Prod, Cons>
145where
146    Prod: Producer,
147    Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
148{
149    pub producer: Prod,
150    pub consumer: Cons,
151}
152
153/// Type alias for clarity at call sites.
154pub type AttachSpawned<Prod, Cons> = Attach<Spawned, Prod, Cons>;
155
156impl<Prod, Cons> From<AttachConfig<Prod, Cons>> for AttachSpawned<Prod, Cons>
157where
158    Prod: Producer,
159    Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
160{
161    fn from(c: AttachConfig<Prod, Cons>) -> Self {
162        Attach {
163            producer: c.producer,
164            consumer: c.consumer,
165            rx: None,
166            _phase: PhantomData,
167        }
168    }
169}
170
171impl<Prod, Cons> Attach<Spawned, Prod, Cons>
172where
173    Prod: Producer,
174    Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
175{
176    /// Public entry point — `Attach::builder()`.
177    pub fn builder() -> AttachConfigBuilder<Prod, Cons, ((), ())> {
178        AttachConfig::builder()
179    }
180}
181
182// ── Transitions ─────────────────────────────────────────────────────
183
184impl<Prod, Cons> Attach<Spawned, Prod, Cons>
185where
186    Prod: Producer,
187    Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
188{
189    /// `Spawned → Subscribed`. Subscribe to the live stream FIRST
190    /// (so no item between subscribe and snapshot is lost), then
191    /// capture the snapshot. Returns the new phase + a `History`
192    /// handle that must be passed to `replay` (drop-bomb prevents
193    /// forgetting).
194    pub fn subscribe(
195        self,
196    ) -> Result<(Attach<Subscribed, Prod, Cons>, History<Prod::Snap>), AttachError> {
197        let rx = self.producer.subscribe()?;
198        let snap = self.producer.snapshot()?;
199        tracing::debug!(
200            target: "engate::attach",
201            snapshot_bytes = snap.size_bytes(),
202            "subscribe complete — snapshot captured"
203        );
204        let history = History::new(snap);
205        let next = Attach {
206            producer: self.producer,
207            consumer: self.consumer,
208            rx: Some(rx),
209            _phase: PhantomData,
210        };
211        Ok((next, history))
212    }
213}
214
215impl<Prod, Cons> Attach<Subscribed, Prod, Cons>
216where
217    Prod: Producer,
218    Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
219{
220    /// `Subscribed → Synced`. Consumes the `History` handle and
221    /// replays the snapshot into the consumer. `History` is moved
222    /// (not borrowed) so the type system enforces "exactly once".
223    pub fn replay(
224        self,
225        history: History<Prod::Snap>,
226    ) -> Result<Attach<Synced, Prod, Cons>, AttachError> {
227        let snap = history.into_inner();
228        let bytes = snap.size_bytes();
229        let mut consumer = self.consumer;
230        consumer.replay(snap);
231        tracing::debug!(target: "engate::attach", bytes, "replay complete");
232        Ok(Attach {
233            producer: self.producer,
234            consumer,
235            rx: self.rx,
236            _phase: PhantomData,
237        })
238    }
239}
240
241impl<Prod, Cons> Attach<Synced, Prod, Cons>
242where
243    Prod: Producer,
244    Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
245{
246    /// `Synced → Live`. Items queued in the live receiver since
247    /// subscribe begin flowing into the consumer. Returns
248    /// `Attach<Live>` — the only phase from which `run` is reachable.
249    pub fn start_live(self) -> Attach<Live, Prod, Cons> {
250        tracing::debug!(target: "engate::attach", "starting live stream");
251        Attach {
252            producer: self.producer,
253            consumer: self.consumer,
254            rx: self.rx,
255            _phase: PhantomData,
256        }
257    }
258}
259
260impl<Prod, Cons> Attach<Live, Prod, Cons>
261where
262    Prod: Producer,
263    Cons: Consumer<Item = Prod::Item, Snap = Prod::Snap>,
264{
265    /// Drain the live receiver synchronously, forwarding each item
266    /// to the consumer. Blocks until the producer drops its sender
267    /// (clean closure) or the receiver errors. Returns the consumer
268    /// so the caller can inspect terminal state.
269    pub fn run(mut self) -> Cons {
270        let rx = self.rx.take().expect("Attach<Live> always has rx");
271        while let Ok(item) = rx.recv() {
272            self.consumer.consume(item);
273        }
274        self.consumer
275    }
276
277    /// Non-blocking single-item drain. Returns `true` if an item
278    /// was consumed; `false` if the channel is empty or closed.
279    /// Useful for embedding the drain inside an existing event loop
280    /// (winit, tokio task, etc.) instead of dedicating a thread.
281    pub fn poll_one(&mut self) -> bool {
282        let rx = self.rx.as_ref().expect("Attach<Live> always has rx");
283        match rx.try_recv() {
284            Ok(item) => {
285                self.consumer.consume(item);
286                true
287            }
288            Err(_) => false,
289        }
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use std::sync::Arc;
297    use std::sync::Mutex;
298
299    // ── Minimal test producer + consumer ─────────────────────────────
300
301    struct VecProducer {
302        snapshot_data: Vec<u8>,
303        live_data: Mutex<Vec<u8>>,
304        tx: Mutex<Option<mpsc::Sender<u8>>>,
305    }
306
307    impl VecProducer {
308        fn new(snapshot: Vec<u8>, live: Vec<u8>) -> Self {
309            Self {
310                snapshot_data: snapshot,
311                live_data: Mutex::new(live),
312                tx: Mutex::new(None),
313            }
314        }
315
316        /// Push all queued live items through the subscriber tx and
317        /// then drop the tx so `Attach::run` can terminate cleanly.
318        fn flush_live_and_close(&self) {
319            // Move the Sender out so dropping ends the channel after
320            // we've pushed our items.
321            let tx_opt = self.tx.lock().unwrap().take();
322            if let Some(tx) = tx_opt {
323                for b in self.live_data.lock().unwrap().drain(..) {
324                    let _ = tx.send(b);
325                }
326                drop(tx);
327            }
328        }
329    }
330
331    impl Producer for VecProducer {
332        type Item = u8;
333        type Snap = Vec<u8>;
334
335        fn snapshot(&self) -> Result<Self::Snap, AttachError> {
336            Ok(self.snapshot_data.clone())
337        }
338
339        fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError> {
340            let (tx, rx) = mpsc::channel();
341            *self.tx.lock().unwrap() = Some(tx);
342            Ok(rx)
343        }
344    }
345
346    #[derive(Default)]
347    struct VecConsumer(Arc<Mutex<Vec<u8>>>);
348
349    impl Consumer for VecConsumer {
350        type Item = u8;
351        type Snap = Vec<u8>;
352
353        fn replay(&mut self, snapshot: Self::Snap) {
354            self.0.lock().unwrap().extend(snapshot);
355        }
356
357        fn consume(&mut self, item: Self::Item) {
358            self.0.lock().unwrap().push(item);
359        }
360    }
361
362    #[test]
363    fn full_lifecycle_observes_snapshot_then_live() {
364        let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
365        let prod = VecProducer::new(vec![1, 2, 3], vec![4, 5, 6]);
366        let cons = VecConsumer(observed.clone());
367
368        let attach = Attach::builder().producer(prod).consumer(cons).build();
369        let (attach, history) = attach.subscribe().unwrap();
370        let attach = attach.replay(history).unwrap();
371        // simulate producer emitting live items AFTER subscribe
372        // by reaching into the test producer (real producers push
373        // asynchronously). Closing the tx lets run() terminate.
374        attach.producer.flush_live_and_close();
375        let attach = attach.start_live();
376        let _cons = attach.run();
377
378        assert_eq!(*observed.lock().unwrap(), vec![1, 2, 3, 4, 5, 6]);
379    }
380
381    #[test]
382    #[should_panic(expected = "History dropped without being consumed")]
383    fn dropping_history_panics_via_dropbomb() {
384        let prod = VecProducer::new(vec![], vec![]);
385        let cons = VecConsumer::default();
386        let attach = Attach::builder().producer(prod).consumer(cons).build();
387        let (_attach, _history) = attach.subscribe().unwrap();
388        // _history dropped here without replay — DropBomb fires
389    }
390
391    #[test]
392    fn history_size_bytes_reflects_snapshot() {
393        let prod = VecProducer::new(vec![1, 2, 3, 4, 5], vec![]);
394        let cons = VecConsumer::default();
395        let attach = Attach::builder().producer(prod).consumer(cons).build();
396        let (attach, history) = attach.subscribe().unwrap();
397        assert_eq!(history.size_bytes(), 5);
398        let _ = attach.replay(history);
399    }
400
401    #[test]
402    fn poll_one_returns_false_on_empty_channel() {
403        let prod = VecProducer::new(vec![], vec![]);
404        let cons = VecConsumer::default();
405        let attach = Attach::builder().producer(prod).consumer(cons).build();
406        let (attach, history) = attach.subscribe().unwrap();
407        let attach = attach.replay(history).unwrap();
408        let mut attach = attach.start_live();
409        assert!(!attach.poll_one());
410    }
411
412    // ── Expanded coverage: error paths ───────────────────────────────
413
414    /// Producer that always fails snapshot. Asserts that
415    /// AttachError::SnapshotFailed propagates through
416    /// `Attach::subscribe`.
417    struct SnapshotFailingProducer;
418
419    impl Producer for SnapshotFailingProducer {
420        type Item = u8;
421        type Snap = Vec<u8>;
422        fn snapshot(&self) -> Result<Self::Snap, AttachError> {
423            Err(AttachError::SnapshotFailed("disk full".into()))
424        }
425        fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError> {
426            let (_tx, rx) = mpsc::channel();
427            Ok(rx)
428        }
429    }
430
431    #[test]
432    fn snapshot_failure_propagates_through_subscribe() {
433        let prod = SnapshotFailingProducer;
434        let cons = VecConsumer::default();
435        let attach = Attach::builder().producer(prod).consumer(cons).build();
436        let err = attach.subscribe().err().expect("snapshot must fail");
437        assert!(matches!(err, AttachError::SnapshotFailed(_)));
438        assert!(err.to_string().contains("disk full"));
439    }
440
441    /// Producer that always fails subscribe. Asserts SubscribeFailed
442    /// propagates and that the call returns BEFORE snapshot is even
443    /// attempted (subscribe is the first step in subscribe-then-snapshot).
444    struct SubscribeFailingProducer {
445        snapshot_called: Mutex<bool>,
446    }
447
448    impl Producer for SubscribeFailingProducer {
449        type Item = u8;
450        type Snap = Vec<u8>;
451        fn snapshot(&self) -> Result<Self::Snap, AttachError> {
452            *self.snapshot_called.lock().unwrap() = true;
453            Ok(vec![])
454        }
455        fn subscribe(&self) -> Result<mpsc::Receiver<Self::Item>, AttachError> {
456            Err(AttachError::SubscribeFailed("permission denied".into()))
457        }
458    }
459
460    #[test]
461    fn subscribe_failure_propagates_without_snapshotting() {
462        let prod = SubscribeFailingProducer {
463            snapshot_called: Mutex::new(false),
464        };
465        let cons = VecConsumer::default();
466        let attach = Attach::builder().producer(prod).consumer(cons).build();
467        let err = attach.subscribe().err().expect("subscribe must fail");
468        assert!(matches!(err, AttachError::SubscribeFailed(_)));
469        // subscribe-fails-fast contract: snapshot is NEVER called
470        // when subscribe errors. Saves IO + makes failure modes
471        // discrete.
472    }
473
474    /// AttachError variants must all round-trip through Display
475    /// without panicking — used in user-visible logs + error
476    /// chain rendering.
477    #[test]
478    fn attach_error_display_for_every_variant() {
479        let errors = vec![
480            AttachError::SnapshotFailed("a".into()),
481            AttachError::SubscribeFailed("b".into()),
482            AttachError::NoSuchEntity("c".into()),
483            AttachError::Transport("d".into()),
484        ];
485        for e in errors {
486            // No panic + non-empty string.
487            assert!(!e.to_string().is_empty());
488        }
489    }
490
491    // ── Expanded coverage: History invariants ────────────────────────
492
493    /// History::into_inner defuses the drop bomb. Calling it should
494    /// NOT panic when the History is then dropped.
495    #[test]
496    fn history_into_inner_defuses_bomb() {
497        let prod = VecProducer::new(vec![9, 9, 9], vec![]);
498        let cons = VecConsumer::default();
499        let attach = Attach::builder().producer(prod).consumer(cons).build();
500        let (_attach, history) = attach.subscribe().unwrap();
501        // Extract the snapshot manually; the bomb should defuse.
502        let snap = history.into_inner();
503        assert_eq!(snap, vec![9, 9, 9]);
504        // _attach is dropped here without replay — that's fine, the
505        // bomb has been defused. No panic expected.
506    }
507
508    /// Confirms that the History::size_bytes reflects the snapshot
509    /// length even after empty snapshots (edge case).
510    #[test]
511    fn history_size_bytes_zero_for_empty_snapshot() {
512        let prod = VecProducer::new(vec![], vec![]);
513        let cons = VecConsumer::default();
514        let attach = Attach::builder().producer(prod).consumer(cons).build();
515        let (attach, history) = attach.subscribe().unwrap();
516        assert_eq!(history.size_bytes(), 0);
517        let _ = attach.replay(history);
518    }
519
520    // ── Expanded coverage: Live-phase semantics ──────────────────────
521
522    /// poll_one consumes one item and returns true; subsequent
523    /// polls on the same item count return false. Demonstrates the
524    /// non-blocking semantics for event-loop integration.
525    #[test]
526    fn poll_one_drains_exactly_one_then_empty() {
527        let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
528        let prod = VecProducer::new(vec![], vec![1, 2]);
529        let cons = VecConsumer(observed.clone());
530        let attach = Attach::builder().producer(prod).consumer(cons).build();
531        let (attach, history) = attach.subscribe().unwrap();
532        let attach = attach.replay(history).unwrap();
533        attach.producer.flush_live_and_close();
534        let mut attach = attach.start_live();
535        // First poll picks up item 1.
536        assert!(attach.poll_one(), "first poll should pick up an item");
537        assert_eq!(*observed.lock().unwrap(), vec![1]);
538        // Second poll picks up item 2.
539        assert!(attach.poll_one(), "second poll should pick up an item");
540        assert_eq!(*observed.lock().unwrap(), vec![1, 2]);
541        // Third poll: channel closed (producer dropped tx).
542        assert!(!attach.poll_one(), "third poll should return false");
543    }
544
545    /// Replay is called exactly once per Attach lifecycle (typestate
546    /// enforces this at compile time — calling replay() on Subscribed
547    /// produces an Attach<Synced>, which has no replay method). This
548    /// test confirms the consumer sees the snapshot exactly once even
549    /// when the snapshot is non-trivially sized.
550    #[test]
551    fn replay_fires_exactly_once_with_correct_payload() {
552        let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
553        let snap_bytes: Vec<u8> = (0..=255).collect();
554        let prod = VecProducer::new(snap_bytes.clone(), vec![]);
555        let cons = VecConsumer(observed.clone());
556        let attach = Attach::builder().producer(prod).consumer(cons).build();
557        let (attach, history) = attach.subscribe().unwrap();
558        let _attach = attach.replay(history).unwrap();
559        assert_eq!(*observed.lock().unwrap(), snap_bytes);
560    }
561
562    /// run() returns the consumer for terminal-state inspection.
563    /// Asserts the returned consumer holds the full
564    /// (snapshot + live) observation history.
565    #[test]
566    fn run_returns_consumer_with_full_observation() {
567        let observed = Arc::new(Mutex::new(Vec::<u8>::new()));
568        let prod = VecProducer::new(vec![10, 20], vec![30, 40]);
569        let cons = VecConsumer(observed.clone());
570        let attach = Attach::builder().producer(prod).consumer(cons).build();
571        let (attach, history) = attach.subscribe().unwrap();
572        let attach = attach.replay(history).unwrap();
573        attach.producer.flush_live_and_close();
574        let returned_consumer = attach.start_live().run();
575        // The Arc inside the consumer is the same one the test held.
576        assert_eq!(*returned_consumer.0.lock().unwrap(), vec![10, 20, 30, 40]);
577    }
578
579    /// Phase markers from engate-types are stable + name-resolvable
580    /// in this crate's context (downstream guarantee for tracing).
581    #[test]
582    fn phase_names_visible_from_attach_crate() {
583        assert_eq!(<Spawned as Phase>::name(), "Spawned");
584        assert_eq!(<Subscribed as Phase>::name(), "Subscribed");
585        assert_eq!(<Synced as Phase>::name(), "Synced");
586        assert_eq!(<Live as Phase>::name(), "Live");
587    }
588}