Skip to main content

klauthed_data/locks/
mongo.rs

1//! MongoDB-backed [`LockManager`] using compare-and-upsert with TTL.
2//!
3//! Each lock is a document in a MongoDB collection:
4//!
5//! ```text
6//! { _id: <key>, token: <uuid-string>, expires_at: <ISO8601 string> }
7//! ```
8//!
9//! **Acquire** — `find_one_and_update` with:
10//! * filter `{_id: key, expires_at: {$lte: now}}` (matches absent or expired docs),
11//! * update `{$set: {token: new_token, expires_at: now+ttl}}`,
12//! * `upsert: true`, `return_document: After`.
13//!
14//! If the document is held by a live token the filter matches nothing and
15//! `find_one_and_update` returns `None`, mapped to `Ok(None)` (contention).
16//! A duplicate-key error on upsert (race between two writers both seeing the
17//! key absent) is also treated as contention.
18//!
19//! **Release** — `delete_one({_id: key, token: token_string})` — compare-and-
20//! delete so only the holder that set this token removes the document.
21//!
22//! Live tests are marked `#[ignore]`; run with a running MongoDB at
23//! `MONGODB_URL` via:
24//! ```text
25//! cargo test -p klauthed-data --features mongodb -- --ignored
26//! ```
27
28use async_trait::async_trait;
29use klauthed_core::time::Duration;
30use klauthed_core::time::Timestamp;
31use mongodb::Collection;
32use mongodb::Database;
33use mongodb::bson::{Document, doc};
34use mongodb::options::{FindOneAndUpdateOptions, ReturnDocument};
35
36use crate::error::DataError;
37use crate::locks::{LockGuard, LockManager, LockToken};
38
39/// Default collection name for lock documents.
40const DEFAULT_COLLECTION: &str = "locks";
41
42/// A [`LockManager`] that stores TTL-bounded locks in a MongoDB collection.
43///
44/// Clone-cheap: holds only the collection handle (an `Arc` internally).
45#[derive(Clone)]
46pub struct MongoLockManager {
47    collection: Collection<Document>,
48}
49
50impl MongoLockManager {
51    /// Wrap an existing database, using the default collection name `"locks"`.
52    pub fn new(db: &Database) -> Self {
53        Self::with_collection(db, DEFAULT_COLLECTION)
54    }
55
56    /// Wrap an existing database, using `collection_name` as the target.
57    pub fn with_collection(db: &Database, collection_name: &str) -> Self {
58        Self { collection: db.collection(collection_name) }
59    }
60
61    /// Release `key` only if `token` still owns it.
62    ///
63    /// Returns `true` if the lock was held and is now released, `false` if it
64    /// had already expired or been taken by a different holder.
65    pub async fn release_token(&self, key: &str, token: LockToken) -> Result<bool, DataError> {
66        let filter = doc! {
67            "_id":   key,
68            "token": token.to_string(),
69        };
70        let result = self
71            .collection
72            .delete_one(filter)
73            .await
74            .map_err(|e| DataError::LockHeld(format!("mongo delete_one failed: {e}")))?;
75        Ok(result.deleted_count > 0)
76    }
77}
78
79#[async_trait]
80impl LockManager for MongoLockManager {
81    async fn acquire(&self, key: &str, ttl: Duration) -> Result<Option<LockGuard>, DataError> {
82        let now = Timestamp::now();
83        let expires_at = now
84            .checked_add(ttl)
85            .ok_or_else(|| DataError::LockHeld(format!("invalid TTL for lock '{key}'")))?;
86
87        let now_str = now.to_rfc3339();
88        let expires_str = expires_at.to_rfc3339();
89        let token = LockToken::new();
90        let token_str = token.to_string();
91
92        // Match documents that are absent or whose `expires_at` has passed.
93        let filter = doc! {
94            "_id":        key,
95            "expires_at": { "$lte": &now_str },
96        };
97        let update = doc! {
98            "$set": {
99                "token":      &token_str,
100                "expires_at": &expires_str,
101            }
102        };
103        let options = FindOneAndUpdateOptions::builder()
104            .upsert(Some(true))
105            .return_document(Some(ReturnDocument::After))
106            .build();
107
108        let result =
109            self.collection.find_one_and_update(filter, update).with_options(options).await;
110
111        match result {
112            Ok(Some(doc)) => {
113                // Verify the returned document has our token — if another writer
114                // upserted at the same instant it could carry a different token.
115                let doc_token = doc.get_str("token").unwrap_or_default();
116                if doc_token == token_str {
117                    Ok(Some(LockGuard::mongo(key.to_owned(), token, self.clone())))
118                } else {
119                    Ok(None)
120                }
121            }
122            // The filter matched nothing (key exists with a live token) — contention.
123            Ok(None) => Ok(None),
124            Err(e) => {
125                // A duplicate-key error means two upserts raced; treat as contention.
126                let msg = e.to_string();
127                if msg.contains("11000") || msg.contains("DuplicateKey") {
128                    Ok(None)
129                } else {
130                    Err(DataError::LockHeld(format!("mongo find_one_and_update failed: {e}")))
131                }
132            }
133        }
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use klauthed_core::id::Id;
141
142    async fn live_manager() -> MongoLockManager {
143        let url =
144            std::env::var("MONGODB_URL").unwrap_or_else(|_| "mongodb://127.0.0.1:27017".to_owned());
145        let client = mongodb::Client::with_uri_str(&url).await.expect("connect mongodb");
146        let db_name = format!("klauthed_test_{}", Id::<()>::new());
147        MongoLockManager::new(&client.database(&db_name))
148    }
149
150    #[tokio::test]
151    #[ignore = "requires a live MongoDB at MONGODB_URL"]
152    async fn acquire_blocks_and_releases() {
153        let locks = live_manager().await;
154        let key = format!("klauthed:test:lock:{}", LockToken::new());
155
156        let guard =
157            locks.acquire(&key, Duration::seconds(30)).await.unwrap().expect("first acquire wins");
158
159        assert!(locks.acquire(&key, Duration::seconds(30)).await.unwrap().is_none());
160
161        guard.release().await.unwrap();
162
163        assert!(locks.acquire(&key, Duration::seconds(30)).await.unwrap().is_some());
164    }
165}