rigatoni_core/state.rs
1// Copyright 2025 Rigatoni Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! State storage for pipeline resume tokens.
18//!
19//! The [`StateStore`] trait provides an abstraction for persisting MongoDB
20//! change stream resume tokens. This enables the pipeline to resume from where
21//! it left off after restarts or failures.
22//!
23//! # Example
24//!
25//! ```rust
26//! use rigatoni_core::state::StateStore;
27//! use mongodb::bson::doc;
28//! use std::collections::HashMap;
29//!
30//! // In-memory implementation for testing
31//! #[derive(Default)]
32//! struct MemoryStateStore {
33//! tokens: std::sync::Arc<tokio::sync::Mutex<HashMap<String, mongodb::bson::Document>>>,
34//! }
35//!
36//! #[async_trait::async_trait]
37//! impl StateStore for MemoryStateStore {
38//! async fn save_resume_token(
39//! &self,
40//! collection: &str,
41//! token: &mongodb::bson::Document,
42//! ) -> Result<(), rigatoni_core::state::StateStoreError> {
43//! self.tokens.lock().await.insert(collection.to_string(), token.clone());
44//! Ok(())
45//! }
46//!
47//! async fn get_resume_token(
48//! &self,
49//! collection: &str,
50//! ) -> Result<Option<mongodb::bson::Document>, rigatoni_core::state::StateStoreError> {
51//! Ok(self.tokens.lock().await.get(collection).cloned())
52//! }
53//!
54//! async fn delete_resume_token(&self, collection: &str) -> Result<(), rigatoni_core::state::StateStoreError> {
55//! self.tokens.lock().await.remove(collection);
56//! Ok(())
57//! }
58//!
59//! async fn list_resume_tokens(
60//! &self,
61//! ) -> Result<HashMap<String, mongodb::bson::Document>, rigatoni_core::state::StateStoreError> {
62//! Ok(self.tokens.lock().await.clone())
63//! }
64//!
65//! async fn close(&self) -> Result<(), rigatoni_core::state::StateStoreError> {
66//! Ok(())
67//! }
68//!
69//! async fn try_acquire_lock(
70//! &self,
71//! _key: &str,
72//! _owner_id: &str,
73//! _ttl: std::time::Duration,
74//! ) -> Result<bool, rigatoni_core::state::StateStoreError> {
75//! Ok(true) // Single instance, always succeed
76//! }
77//!
78//! async fn refresh_lock(
79//! &self,
80//! _key: &str,
81//! _owner_id: &str,
82//! _ttl: std::time::Duration,
83//! ) -> Result<bool, rigatoni_core::state::StateStoreError> {
84//! Ok(true)
85//! }
86//!
87//! async fn release_lock(
88//! &self,
89//! _key: &str,
90//! _owner_id: &str,
91//! ) -> Result<bool, rigatoni_core::state::StateStoreError> {
92//! Ok(true)
93//! }
94//!
95//! async fn is_locked(&self, _key: &str) -> Result<bool, rigatoni_core::state::StateStoreError> {
96//! Ok(false)
97//! }
98//! }
99//! ```
100
101use mongodb::bson::Document;
102use std::collections::HashMap;
103use std::time::Duration;
104
105/// Trait for state storage backends.
106///
107/// Implementations should persist resume tokens durably to survive process restarts.
108#[async_trait::async_trait]
109pub trait StateStore {
110 /// Saves a resume token for a collection.
111 ///
112 /// # Errors
113 ///
114 /// Returns an error if the token cannot be saved.
115 async fn save_resume_token(
116 &self,
117 collection: &str,
118 token: &Document,
119 ) -> Result<(), StateStoreError>;
120
121 /// Retrieves the resume token for a collection.
122 ///
123 /// Returns `None` if no token exists for the collection.
124 ///
125 /// # Errors
126 ///
127 /// Returns an error if the token cannot be retrieved.
128 async fn get_resume_token(&self, collection: &str)
129 -> Result<Option<Document>, StateStoreError>;
130
131 /// Deletes the resume token for a collection.
132 ///
133 /// # Errors
134 ///
135 /// Returns an error if the token cannot be deleted.
136 async fn delete_resume_token(&self, collection: &str) -> Result<(), StateStoreError>;
137
138 /// Lists all resume tokens.
139 ///
140 /// Returns a map of collection names to resume tokens.
141 ///
142 /// # Errors
143 ///
144 /// Returns an error if the tokens cannot be listed.
145 async fn list_resume_tokens(&self) -> Result<HashMap<String, Document>, StateStoreError>;
146
147 /// Closes the state store, releasing any resources.
148 ///
149 /// # Errors
150 ///
151 /// Returns an error if the store cannot be closed cleanly.
152 async fn close(&self) -> Result<(), StateStoreError>;
153
154 // ==========================================================================
155 // Distributed Locking Methods
156 // ==========================================================================
157
158 /// Try to acquire a distributed lock with TTL.
159 ///
160 /// This method attempts to acquire an exclusive lock for the given key.
161 /// The lock is held by the specified owner and will automatically expire
162 /// after the TTL duration if not refreshed or released.
163 ///
164 /// # Arguments
165 ///
166 /// * `key` - Lock identifier (e.g., "rigatoni:lock:mydb:users")
167 /// * `owner_id` - Unique identifier for this instance (e.g., UUID, hostname)
168 /// * `ttl` - Time-to-live for the lock (auto-expires if owner crashes)
169 ///
170 /// # Returns
171 ///
172 /// * `Ok(true)` - Lock acquired successfully
173 /// * `Ok(false)` - Lock already held by another instance
174 /// * `Err(_)` - Error communicating with state store
175 ///
176 /// # Example
177 ///
178 /// ```rust,ignore
179 /// use std::time::Duration;
180 ///
181 /// let acquired = store.try_acquire_lock(
182 /// "rigatoni:lock:mydb:users",
183 /// "instance-abc123",
184 /// Duration::from_secs(30),
185 /// ).await?;
186 ///
187 /// if acquired {
188 /// println!("Lock acquired, safe to process");
189 /// } else {
190 /// println!("Lock held by another instance");
191 /// }
192 /// ```
193 async fn try_acquire_lock(
194 &self,
195 key: &str,
196 owner_id: &str,
197 ttl: Duration,
198 ) -> Result<bool, StateStoreError>;
199
200 /// Refresh an existing lock to extend its TTL.
201 ///
202 /// This method should be called periodically (heartbeat) to prevent
203 /// the lock from expiring during normal operation. The refresh only
204 /// succeeds if the lock is still held by the specified owner.
205 ///
206 /// # Arguments
207 ///
208 /// * `key` - Lock identifier
209 /// * `owner_id` - Unique identifier for this instance
210 /// * `ttl` - New time-to-live for the lock
211 ///
212 /// # Returns
213 ///
214 /// * `Ok(true)` - Lock refreshed successfully
215 /// * `Ok(false)` - Lock not held by this owner (was acquired by someone else or expired)
216 /// * `Err(_)` - Error communicating with state store
217 ///
218 /// # Example
219 ///
220 /// ```rust,ignore
221 /// // Refresh lock every 10 seconds with 30 second TTL
222 /// let refreshed = store.refresh_lock(
223 /// "rigatoni:lock:mydb:users",
224 /// "instance-abc123",
225 /// Duration::from_secs(30),
226 /// ).await?;
227 ///
228 /// if !refreshed {
229 /// // Lost the lock - another instance took over
230 /// panic!("Lost lock for collection");
231 /// }
232 /// ```
233 async fn refresh_lock(
234 &self,
235 key: &str,
236 owner_id: &str,
237 ttl: Duration,
238 ) -> Result<bool, StateStoreError>;
239
240 /// Release a lock.
241 ///
242 /// This method releases the lock if it is held by the specified owner.
243 /// Should be called during graceful shutdown to allow other instances
244 /// to take over immediately without waiting for TTL expiry.
245 ///
246 /// # Arguments
247 ///
248 /// * `key` - Lock identifier
249 /// * `owner_id` - Unique identifier for this instance
250 ///
251 /// # Returns
252 ///
253 /// * `Ok(true)` - Lock released successfully
254 /// * `Ok(false)` - Lock not held by this owner (already released or acquired by another)
255 /// * `Err(_)` - Error communicating with state store
256 ///
257 /// # Example
258 ///
259 /// ```rust,ignore
260 /// // Release lock on shutdown
261 /// let released = store.release_lock(
262 /// "rigatoni:lock:mydb:users",
263 /// "instance-abc123",
264 /// ).await?;
265 ///
266 /// if released {
267 /// println!("Lock released, other instances can now acquire");
268 /// }
269 /// ```
270 async fn release_lock(&self, key: &str, owner_id: &str) -> Result<bool, StateStoreError>;
271
272 /// Check if a lock is held (by any instance).
273 ///
274 /// This is a read-only operation to check if a lock exists.
275 /// Note: The result may be stale by the time it's used due to
276 /// distributed system timing.
277 ///
278 /// # Arguments
279 ///
280 /// * `key` - Lock identifier
281 ///
282 /// # Returns
283 ///
284 /// * `Ok(true)` - Lock is currently held
285 /// * `Ok(false)` - Lock is not held (available)
286 /// * `Err(_)` - Error communicating with state store
287 async fn is_locked(&self, key: &str) -> Result<bool, StateStoreError>;
288}
289
290/// Errors that can occur during state store operations.
291#[derive(Debug, thiserror::Error)]
292pub enum StateStoreError {
293 /// I/O error
294 #[error("I/O error: {0}")]
295 Io(#[from] std::io::Error),
296
297 /// Serialization error
298 #[error("Serialization error: {0}")]
299 Serialization(String),
300
301 /// Connection error
302 #[error("Connection error: {0}")]
303 Connection(String),
304
305 /// Not found error
306 #[error("Not found: {0}")]
307 NotFound(String),
308
309 /// Other errors
310 #[error("State store error: {0}")]
311 Other(String),
312}