Skip to main content

ai_store_sync/
sink.rs

1//! Synchronous [`ProjectionSink`] bridge.
2//!
3//! [`SyncProjectionSink`] is the sibling of [`ProjectionSink`] with sync
4//! method signatures. [`BlockingSink`] wraps an implementation of
5//! [`SyncProjectionSink`] and exposes it to the async facade.
6//!
7//! ## Two dispatch modes
8//!
9//! - [`Dispatch::SpawnBlocking`] (default via [`BlockingSink::new`]) hands
10//!   each `commit` / `on_label_set` off to `tokio::task::spawn_blocking`. Use
11//!   this whenever the sync method may block for more than a few hundred
12//!   microseconds — file I/O, `fsync`, database drivers with only sync APIs.
13//!   Requires a runtime with a blocking pool (any `tokio::runtime` built via
14//!   `Builder::new_multi_thread()` or `Builder::new_current_thread()` with
15//!   `enable_all()` has one).
16//!
17//! - [`Dispatch::Inline`] (via [`BlockingSink::inline`]) runs the sync method
18//!   directly on the async worker. Correct only for fast in-memory
19//!   bookkeeping — anything else stalls the runtime.
20//!
21//! ## Idempotence
22//!
23//! [`SyncProjectionSink`] inherits the same idempotence contract as
24//! [`ProjectionSink`]: replaying the same `(stream, seq)` must produce the
25//! same effect as the first application, because `Store::catch_up` and
26//! `Store::rebuild` may re-drive events after a crash or configuration
27//! change.
28
29use std::sync::Arc;
30
31use ai_store_core::{Event, Label, ProjectionSink, Seq, StoreError, StreamId};
32use async_trait::async_trait;
33use serde_json::Value;
34
35/// Synchronous variant of [`ProjectionSink`]. Implement this when the sink's
36/// body is inherently blocking (file I/O, synchronous database drivers,
37/// stdlib `println!`), then wrap in [`BlockingSink`] to plug into the async
38/// facade.
39pub trait SyncProjectionSink: Send + Sync + 'static {
40    /// Stable identifier used as the checkpoint key. See
41    /// [`ProjectionSink::id`].
42    fn id(&self) -> &str;
43
44    /// Apply one committed event. Must be idempotent under retries.
45    fn commit(
46        &self,
47        stream: &StreamId,
48        seq: Seq,
49        state: &Value,
50        event: &Event,
51    ) -> Result<(), StoreError>;
52
53    /// React to a label being pinned or moved. Default is a no-op, matching
54    /// the async trait's default.
55    fn on_label_set(
56        &self,
57        _stream: &StreamId,
58        _label: &Label,
59        _at: Seq,
60        _state: &Value,
61        _event: &Event,
62    ) -> Result<(), StoreError> {
63        Ok(())
64    }
65
66    /// React to a label being deleted. Default is a no-op, matching the
67    /// async trait's default.
68    fn on_label_deleted(&self, _stream: &StreamId, _label: &Label) -> Result<(), StoreError> {
69        Ok(())
70    }
71}
72
73/// How [`BlockingSink`] hands off calls to the wrapped
74/// [`SyncProjectionSink`].
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum Dispatch {
77    /// Run the sync method inline on the async worker.
78    ///
79    /// Safe only when the method is guaranteed to return in a few hundred
80    /// microseconds or less. Anything else blocks the runtime.
81    Inline,
82    /// Offload to `tokio::task::spawn_blocking`.
83    ///
84    /// The default. Correct for any sink that touches the file system,
85    /// synchronous DB drivers, or otherwise blocking code.
86    SpawnBlocking,
87}
88
89/// Adapter turning a [`SyncProjectionSink`] into a [`ProjectionSink`].
90pub struct BlockingSink<T: SyncProjectionSink> {
91    inner: Arc<T>,
92    dispatch: Dispatch,
93}
94
95impl<T: SyncProjectionSink> BlockingSink<T> {
96    /// Wrap `inner` in `spawn_blocking` dispatch — the safe default for any
97    /// sink that may block.
98    pub fn new(inner: T) -> Self {
99        Self {
100            inner: Arc::new(inner),
101            dispatch: Dispatch::SpawnBlocking,
102        }
103    }
104
105    /// Wrap `inner` in inline dispatch. Use only for fast in-memory sinks;
106    /// see [`Dispatch::Inline`].
107    pub fn inline(inner: T) -> Self {
108        Self {
109            inner: Arc::new(inner),
110            dispatch: Dispatch::Inline,
111        }
112    }
113
114    /// Access the wrapped sink. Useful for tests that need to peek at
115    /// accumulated state after the async dispatch loop advanced the
116    /// checkpoint.
117    pub fn inner(&self) -> &Arc<T> {
118        &self.inner
119    }
120}
121
122#[async_trait]
123impl<T: SyncProjectionSink> ProjectionSink for BlockingSink<T> {
124    fn id(&self) -> &str {
125        // `id()` on the trait returns a borrowed `&str` from `T`. Since
126        // `self.inner: Arc<T>` is stable for the lifetime of `&self`, this
127        // borrow is sound.
128        self.inner.id()
129    }
130
131    async fn commit(
132        &self,
133        stream: &StreamId,
134        seq: Seq,
135        state: &Value,
136        event: &Event,
137    ) -> Result<(), StoreError> {
138        match self.dispatch {
139            Dispatch::Inline => self.inner.commit(stream, seq, state, event),
140            Dispatch::SpawnBlocking => {
141                let inner = Arc::clone(&self.inner);
142                let stream = stream.clone();
143                let state = state.clone();
144                let event = event.clone();
145                tokio::task::spawn_blocking(move || inner.commit(&stream, seq, &state, &event))
146                    .await
147                    .map_err(|e| StoreError::Backend(format!("blocking sink join: {e}")))?
148            }
149        }
150    }
151
152    async fn on_label_set(
153        &self,
154        stream: &StreamId,
155        label: &Label,
156        at: Seq,
157        state: &Value,
158        event: &Event,
159    ) -> Result<(), StoreError> {
160        match self.dispatch {
161            Dispatch::Inline => self.inner.on_label_set(stream, label, at, state, event),
162            Dispatch::SpawnBlocking => {
163                let inner = Arc::clone(&self.inner);
164                let stream = stream.clone();
165                let label = label.clone();
166                let state = state.clone();
167                let event = event.clone();
168                tokio::task::spawn_blocking(move || {
169                    inner.on_label_set(&stream, &label, at, &state, &event)
170                })
171                .await
172                .map_err(|e| StoreError::Backend(format!("blocking sink join: {e}")))?
173            }
174        }
175    }
176
177    async fn on_label_deleted(&self, stream: &StreamId, label: &Label) -> Result<(), StoreError> {
178        match self.dispatch {
179            Dispatch::Inline => self.inner.on_label_deleted(stream, label),
180            Dispatch::SpawnBlocking => {
181                let inner = Arc::clone(&self.inner);
182                let stream = stream.clone();
183                let label = label.clone();
184                tokio::task::spawn_blocking(move || inner.on_label_deleted(&stream, &label))
185                    .await
186                    .map_err(|e| StoreError::Backend(format!("blocking sink join: {e}")))?
187            }
188        }
189    }
190}