Skip to main content

winterbaume_core/
views.rs

1//! Unified state view API for service backends.
2//!
3//! The [`StatefulService`] trait provides a standardized way to access and
4//! modify any service's internal state through serde-compatible typed views.
5//! View types are separate from internal state types to decouple the
6//! serialization contract from implementation details.
7
8use std::pin::Pin;
9use std::sync::Arc;
10
11use serde::Serialize;
12use serde::de::DeserializeOwned;
13use tokio::io::AsyncRead;
14
15use crate::service::MockService;
16use crate::vfs::VfsError;
17
18/// Error type for state view operations.
19#[derive(Debug)]
20pub enum StateViewError {
21    /// Serialization or deserialization failed.
22    Serialize(serde_json::Error),
23    /// The view data is invalid or inconsistent.
24    Invalid(String),
25    /// A blob operation failed during export or import.
26    Blob(VfsError),
27}
28
29impl std::fmt::Display for StateViewError {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        match self {
32            StateViewError::Serialize(e) => write!(f, "serialization error: {e}"),
33            StateViewError::Invalid(msg) => write!(f, "invalid state: {msg}"),
34            StateViewError::Blob(e) => write!(f, "blob error: {e}"),
35        }
36    }
37}
38
39impl std::error::Error for StateViewError {
40    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
41        match self {
42            StateViewError::Serialize(e) => Some(e),
43            StateViewError::Invalid(_) => None,
44            StateViewError::Blob(e) => Some(e),
45        }
46    }
47}
48
49impl From<VfsError> for StateViewError {
50    fn from(e: VfsError) -> Self {
51        StateViewError::Blob(e)
52    }
53}
54
55impl From<serde_json::Error> for StateViewError {
56    fn from(e: serde_json::Error) -> Self {
57        StateViewError::Serialize(e)
58    }
59}
60
61/// A listener called when a service's state changes.
62///
63/// Arguments: `account_id`, `region`, and a reference to the new state snapshot.
64/// The listener runs synchronously; use `tokio::spawn` inside for async work.
65pub type StateChangeListener<V> = Arc<dyn Fn(&str, &str, &V) + Send + Sync>;
66
67/// Manages state-change subscriptions for a service.
68///
69/// Embed one instance in each service struct that implements [`StatefulService`]
70/// and delegate the required `notifier()` method to it.
71pub struct StateChangeNotifier<V> {
72    listeners: std::sync::RwLock<Vec<StateChangeListener<V>>>,
73}
74
75impl<V: Send + Sync> StateChangeNotifier<V> {
76    pub fn new() -> Self {
77        Self {
78            listeners: std::sync::RwLock::new(Vec::new()),
79        }
80    }
81
82    /// Register a closure to be called on every state change.
83    pub fn subscribe(&self, f: impl Fn(&str, &str, &V) + Send + Sync + 'static) {
84        self.listeners.write().unwrap().push(Arc::new(f));
85    }
86
87    /// Invoke all registered listeners with the given snapshot.
88    pub fn notify(&self, account_id: &str, region: &str, view: &V) {
89        let listeners = self.listeners.read().unwrap();
90        for listener in listeners.iter() {
91            listener(account_id, region, view);
92        }
93    }
94}
95
96impl<V: Send + Sync> Default for StateChangeNotifier<V> {
97    fn default() -> Self {
98        Self::new()
99    }
100}
101
102/// A service that exposes typed, serde-compatible views of its internal state.
103///
104/// Each service implements this trait with a `StateView` type that is a
105/// serde-friendly representation of its internal state. The view types
106/// are separate from the internal types to decouple serialization from
107/// implementation.
108///
109/// # Example
110///
111/// ```ignore
112/// // Take a snapshot of S3 state
113/// let view: S3StateView = s3_service.snapshot("123456789012", "us-east-1");
114///
115/// // Serialize to JSON
116/// let json = serde_json::to_string_pretty(&view).unwrap();
117///
118/// // Restore from a view
119/// let view: S3StateView = serde_json::from_str(&json).unwrap();
120/// s3_service.restore("123456789012", "us-east-1", view).unwrap();
121/// ```
122#[allow(async_fn_in_trait)]
123pub trait StatefulService: MockService {
124    /// Serde-compatible view of this service's per-region state.
125    type StateView: Serialize + DeserializeOwned + Send + Sync;
126
127    /// Take a snapshot of the state for the given account/region as a typed view.
128    async fn snapshot(&self, account_id: &str, region: &str) -> Self::StateView;
129
130    /// Restore state for the given account/region from a typed view.
131    /// Replaces the existing state entirely.
132    async fn restore(
133        &self,
134        account_id: &str,
135        region: &str,
136        view: Self::StateView,
137    ) -> Result<(), StateViewError>;
138
139    /// Merge a partial view into existing state (additive, does not remove
140    /// existing resources).
141    async fn merge(
142        &self,
143        account_id: &str,
144        region: &str,
145        view: Self::StateView,
146    ) -> Result<(), StateViewError>;
147
148    /// Returns the state-change notifier embedded in this service.
149    ///
150    /// Implement by returning a reference to a `StateChangeNotifier` field
151    /// on the service struct.
152    fn notifier(&self) -> &StateChangeNotifier<Self::StateView>;
153
154    /// Take a snapshot of the given account/region and forward it to all
155    /// registered listeners. Call this after any successful state mutation.
156    ///
157    /// The write lock on the state must be released before calling this
158    /// method to avoid a deadlock (snapshot acquires a read lock).
159    async fn notify_state_changed(&self, account_id: &str, region: &str) {
160        let view = self.snapshot(account_id, region).await;
161        self.notifier().notify(account_id, region, &view);
162    }
163}
164
165// ---------------------------------------------------------------------------
166// Blob-backed service extension (visitor pattern, minibatch)
167// ---------------------------------------------------------------------------
168
169/// Default number of blobs per minibatch in the default `export_blobs` /
170/// `import_blobs` implementations.
171pub const DEFAULT_BLOB_BATCH_SIZE: usize = 64;
172
173/// A single blob entry within an export batch.
174pub struct BlobExportEntry {
175    /// Blob key as it appears in the `StateView`.
176    pub key: String,
177    /// Streaming reader for the blob content.
178    pub reader: Box<dyn AsyncRead + Send + Unpin>,
179    /// Size in bytes, if known from a stat call.
180    pub size: Option<u64>,
181}
182
183/// Callback object for receiving exported blobs in minibatches.
184///
185/// The returned future borrows `&mut self`, so the implementation can
186/// accumulate state across calls without `Arc<Mutex<…>>`.  The caller
187/// awaits each future before calling `visit` again, so no aliasing occurs.
188///
189/// This trait is dyn-compatible — use `&mut dyn BlobVisitor` in generic code.
190pub trait BlobVisitor: Send {
191    fn visit(
192        &mut self,
193        batch: Vec<BlobExportEntry>,
194    ) -> Pin<Box<dyn Future<Output = Result<(), VfsError>> + Send + '_>>;
195}
196
197/// Callback object for supplying blob data during import.
198///
199/// Same borrowing semantics as [`BlobVisitor`]: the future borrows
200/// `&mut self` and is awaited before the next call.
201pub trait BlobSource: Send {
202    fn fetch(
203        &mut self,
204        key: String,
205    ) -> Pin<
206        Box<dyn Future<Output = Result<Box<dyn AsyncRead + Send + Unpin>, VfsError>> + Send + '_>,
207    >;
208}
209
210/// Extension trait for services whose state references blobs in a backing store.
211///
212/// The backing store is service-wide (not partitioned by account or region).
213/// Export and import use [`BlobVisitor`] / [`BlobSource`] callbacks invoked
214/// repeatedly in minibatches.
215///
216/// # Workflow
217///
218/// **Export**:
219/// ```ignore
220/// service.snapshot_with_blobs(&mut my_visitor).await?;
221/// ```
222///
223/// **Import**:
224/// ```ignore
225/// service.restore_with_blobs(view, &mut my_source).await?;
226/// ```
227#[allow(async_fn_in_trait)]
228pub trait BlobBackedService: StatefulService {
229    /// Atomically snapshot metadata and export all blobs.
230    ///
231    /// Captures the `StateView` and streams all backing store blobs
232    /// through the visitor in minibatches. Returns the view consistent
233    /// with the exported blobs.
234    async fn snapshot_with_blobs(
235        &self,
236        account_id: &str,
237        region: &str,
238        visitor: &mut dyn BlobVisitor,
239    ) -> Result<Self::StateView, StateViewError>;
240
241    /// Import blobs into the service's backing store from a source.
242    ///
243    /// The source is called for each blob key referenced in the view;
244    /// it returns a reader for that blob's content. Call this before
245    /// `restore()` so that blob keys resolve correctly.
246    async fn restore_with_blobs(
247        &self,
248        account_id: &str,
249        region: &str,
250        view: Self::StateView,
251        source: &mut dyn BlobSource,
252    ) -> Result<(), StateViewError>;
253}