rigatoni_core/
state.rs

1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! State storage for pipeline resume tokens.
18//!
19//! The [`StateStore`] trait provides an abstraction for persisting MongoDB
20//! change stream resume tokens. This enables the pipeline to resume from where
21//! it left off after restarts or failures.
22//!
23//! # Example
24//!
25//! ```rust
26//! use rigatoni_core::state::StateStore;
27//! use mongodb::bson::doc;
28//! use std::collections::HashMap;
29//!
30//! // In-memory implementation for testing
31//! #[derive(Default)]
32//! struct MemoryStateStore {
33//!     tokens: std::sync::Arc<tokio::sync::Mutex<HashMap<String, mongodb::bson::Document>>>,
34//! }
35//!
36//! #[async_trait::async_trait]
37//! impl StateStore for MemoryStateStore {
38//!     async fn save_resume_token(
39//!         &self,
40//!         collection: &str,
41//!         token: &mongodb::bson::Document,
42//!     ) -> Result<(), rigatoni_core::state::StateStoreError> {
43//!         self.tokens.lock().await.insert(collection.to_string(), token.clone());
44//!         Ok(())
45//!     }
46//!
47//!     async fn get_resume_token(
48//!         &self,
49//!         collection: &str,
50//!     ) -> Result<Option<mongodb::bson::Document>, rigatoni_core::state::StateStoreError> {
51//!         Ok(self.tokens.lock().await.get(collection).cloned())
52//!     }
53//!
54//!     async fn delete_resume_token(&self, collection: &str) -> Result<(), rigatoni_core::state::StateStoreError> {
55//!         self.tokens.lock().await.remove(collection);
56//!         Ok(())
57//!     }
58//!
59//!     async fn list_resume_tokens(
60//!         &self,
61//!     ) -> Result<HashMap<String, mongodb::bson::Document>, rigatoni_core::state::StateStoreError> {
62//!         Ok(self.tokens.lock().await.clone())
63//!     }
64//!
65//!     async fn close(&self) -> Result<(), rigatoni_core::state::StateStoreError> {
66//!         Ok(())
67//!     }
68//!
69//!     async fn try_acquire_lock(
70//!         &self,
71//!         _key: &str,
72//!         _owner_id: &str,
73//!         _ttl: std::time::Duration,
74//!     ) -> Result<bool, rigatoni_core::state::StateStoreError> {
75//!         Ok(true) // Single instance, always succeed
76//!     }
77//!
78//!     async fn refresh_lock(
79//!         &self,
80//!         _key: &str,
81//!         _owner_id: &str,
82//!         _ttl: std::time::Duration,
83//!     ) -> Result<bool, rigatoni_core::state::StateStoreError> {
84//!         Ok(true)
85//!     }
86//!
87//!     async fn release_lock(
88//!         &self,
89//!         _key: &str,
90//!         _owner_id: &str,
91//!     ) -> Result<bool, rigatoni_core::state::StateStoreError> {
92//!         Ok(true)
93//!     }
94//!
95//!     async fn is_locked(&self, _key: &str) -> Result<bool, rigatoni_core::state::StateStoreError> {
96//!         Ok(false)
97//!     }
98//! }
99//! ```
100
101use mongodb::bson::Document;
102use std::collections::HashMap;
103use std::time::Duration;
104
105/// Trait for state storage backends.
106///
107/// Implementations should persist resume tokens durably to survive process restarts.
108#[async_trait::async_trait]
109pub trait StateStore {
110    /// Saves a resume token for a collection.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if the token cannot be saved.
115    async fn save_resume_token(
116        &self,
117        collection: &str,
118        token: &Document,
119    ) -> Result<(), StateStoreError>;
120
121    /// Retrieves the resume token for a collection.
122    ///
123    /// Returns `None` if no token exists for the collection.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the token cannot be retrieved.
128    async fn get_resume_token(&self, collection: &str)
129        -> Result<Option<Document>, StateStoreError>;
130
131    /// Deletes the resume token for a collection.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the token cannot be deleted.
136    async fn delete_resume_token(&self, collection: &str) -> Result<(), StateStoreError>;
137
138    /// Lists all resume tokens.
139    ///
140    /// Returns a map of collection names to resume tokens.
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the tokens cannot be listed.
145    async fn list_resume_tokens(&self) -> Result<HashMap<String, Document>, StateStoreError>;
146
147    /// Closes the state store, releasing any resources.
148    ///
149    /// # Errors
150    ///
151    /// Returns an error if the store cannot be closed cleanly.
152    async fn close(&self) -> Result<(), StateStoreError>;
153
154    // ==========================================================================
155    // Distributed Locking Methods
156    // ==========================================================================
157
158    /// Try to acquire a distributed lock with TTL.
159    ///
160    /// This method attempts to acquire an exclusive lock for the given key.
161    /// The lock is held by the specified owner and will automatically expire
162    /// after the TTL duration if not refreshed or released.
163    ///
164    /// # Arguments
165    ///
166    /// * `key` - Lock identifier (e.g., "rigatoni:lock:mydb:users")
167    /// * `owner_id` - Unique identifier for this instance (e.g., UUID, hostname)
168    /// * `ttl` - Time-to-live for the lock (auto-expires if owner crashes)
169    ///
170    /// # Returns
171    ///
172    /// * `Ok(true)` - Lock acquired successfully
173    /// * `Ok(false)` - Lock already held by another instance
174    /// * `Err(_)` - Error communicating with state store
175    ///
176    /// # Example
177    ///
178    /// ```rust,ignore
179    /// use std::time::Duration;
180    ///
181    /// let acquired = store.try_acquire_lock(
182    ///     "rigatoni:lock:mydb:users",
183    ///     "instance-abc123",
184    ///     Duration::from_secs(30),
185    /// ).await?;
186    ///
187    /// if acquired {
188    ///     println!("Lock acquired, safe to process");
189    /// } else {
190    ///     println!("Lock held by another instance");
191    /// }
192    /// ```
193    async fn try_acquire_lock(
194        &self,
195        key: &str,
196        owner_id: &str,
197        ttl: Duration,
198    ) -> Result<bool, StateStoreError>;
199
200    /// Refresh an existing lock to extend its TTL.
201    ///
202    /// This method should be called periodically (heartbeat) to prevent
203    /// the lock from expiring during normal operation. The refresh only
204    /// succeeds if the lock is still held by the specified owner.
205    ///
206    /// # Arguments
207    ///
208    /// * `key` - Lock identifier
209    /// * `owner_id` - Unique identifier for this instance
210    /// * `ttl` - New time-to-live for the lock
211    ///
212    /// # Returns
213    ///
214    /// * `Ok(true)` - Lock refreshed successfully
215    /// * `Ok(false)` - Lock not held by this owner (was acquired by someone else or expired)
216    /// * `Err(_)` - Error communicating with state store
217    ///
218    /// # Example
219    ///
220    /// ```rust,ignore
221    /// // Refresh lock every 10 seconds with 30 second TTL
222    /// let refreshed = store.refresh_lock(
223    ///     "rigatoni:lock:mydb:users",
224    ///     "instance-abc123",
225    ///     Duration::from_secs(30),
226    /// ).await?;
227    ///
228    /// if !refreshed {
229    ///     // Lost the lock - another instance took over
230    ///     panic!("Lost lock for collection");
231    /// }
232    /// ```
233    async fn refresh_lock(
234        &self,
235        key: &str,
236        owner_id: &str,
237        ttl: Duration,
238    ) -> Result<bool, StateStoreError>;
239
240    /// Release a lock.
241    ///
242    /// This method releases the lock if it is held by the specified owner.
243    /// Should be called during graceful shutdown to allow other instances
244    /// to take over immediately without waiting for TTL expiry.
245    ///
246    /// # Arguments
247    ///
248    /// * `key` - Lock identifier
249    /// * `owner_id` - Unique identifier for this instance
250    ///
251    /// # Returns
252    ///
253    /// * `Ok(true)` - Lock released successfully
254    /// * `Ok(false)` - Lock not held by this owner (already released or acquired by another)
255    /// * `Err(_)` - Error communicating with state store
256    ///
257    /// # Example
258    ///
259    /// ```rust,ignore
260    /// // Release lock on shutdown
261    /// let released = store.release_lock(
262    ///     "rigatoni:lock:mydb:users",
263    ///     "instance-abc123",
264    /// ).await?;
265    ///
266    /// if released {
267    ///     println!("Lock released, other instances can now acquire");
268    /// }
269    /// ```
270    async fn release_lock(&self, key: &str, owner_id: &str) -> Result<bool, StateStoreError>;
271
272    /// Check if a lock is held (by any instance).
273    ///
274    /// This is a read-only operation to check if a lock exists.
275    /// Note: The result may be stale by the time it's used due to
276    /// distributed system timing.
277    ///
278    /// # Arguments
279    ///
280    /// * `key` - Lock identifier
281    ///
282    /// # Returns
283    ///
284    /// * `Ok(true)` - Lock is currently held
285    /// * `Ok(false)` - Lock is not held (available)
286    /// * `Err(_)` - Error communicating with state store
287    async fn is_locked(&self, key: &str) -> Result<bool, StateStoreError>;
288}
289
290/// Errors that can occur during state store operations.
291#[derive(Debug, thiserror::Error)]
292pub enum StateStoreError {
293    /// I/O error
294    #[error("I/O error: {0}")]
295    Io(#[from] std::io::Error),
296
297    /// Serialization error
298    #[error("Serialization error: {0}")]
299    Serialization(String),
300
301    /// Connection error
302    #[error("Connection error: {0}")]
303    Connection(String),
304
305    /// Not found error
306    #[error("Not found: {0}")]
307    NotFound(String),
308
309    /// Other errors
310    #[error("State store error: {0}")]
311    Other(String),
312}