klauthed_data/locks/
mongo.rs1use async_trait::async_trait;
29use klauthed_core::time::Duration;
30use klauthed_core::time::Timestamp;
31use mongodb::Collection;
32use mongodb::Database;
33use mongodb::bson::{Document, doc};
34use mongodb::options::{FindOneAndUpdateOptions, ReturnDocument};
35
36use crate::error::DataError;
37use crate::locks::{LockGuard, LockManager, LockToken};
38
39const DEFAULT_COLLECTION: &str = "locks";
41
42#[derive(Clone)]
46pub struct MongoLockManager {
47 collection: Collection<Document>,
48}
49
50impl MongoLockManager {
51 pub fn new(db: &Database) -> Self {
53 Self::with_collection(db, DEFAULT_COLLECTION)
54 }
55
56 pub fn with_collection(db: &Database, collection_name: &str) -> Self {
58 Self { collection: db.collection(collection_name) }
59 }
60
61 pub async fn release_token(&self, key: &str, token: LockToken) -> Result<bool, DataError> {
66 let filter = doc! {
67 "_id": key,
68 "token": token.to_string(),
69 };
70 let result = self
71 .collection
72 .delete_one(filter)
73 .await
74 .map_err(|e| DataError::LockHeld(format!("mongo delete_one failed: {e}")))?;
75 Ok(result.deleted_count > 0)
76 }
77}
78
79#[async_trait]
80impl LockManager for MongoLockManager {
81 async fn acquire(&self, key: &str, ttl: Duration) -> Result<Option<LockGuard>, DataError> {
82 let now = Timestamp::now();
83 let expires_at = now
84 .checked_add(ttl)
85 .ok_or_else(|| DataError::LockHeld(format!("invalid TTL for lock '{key}'")))?;
86
87 let now_str = now.to_rfc3339();
88 let expires_str = expires_at.to_rfc3339();
89 let token = LockToken::new();
90 let token_str = token.to_string();
91
92 let filter = doc! {
94 "_id": key,
95 "expires_at": { "$lte": &now_str },
96 };
97 let update = doc! {
98 "$set": {
99 "token": &token_str,
100 "expires_at": &expires_str,
101 }
102 };
103 let options = FindOneAndUpdateOptions::builder()
104 .upsert(Some(true))
105 .return_document(Some(ReturnDocument::After))
106 .build();
107
108 let result =
109 self.collection.find_one_and_update(filter, update).with_options(options).await;
110
111 match result {
112 Ok(Some(doc)) => {
113 let doc_token = doc.get_str("token").unwrap_or_default();
116 if doc_token == token_str {
117 Ok(Some(LockGuard::mongo(key.to_owned(), token, self.clone())))
118 } else {
119 Ok(None)
120 }
121 }
122 Ok(None) => Ok(None),
124 Err(e) => {
125 let msg = e.to_string();
127 if msg.contains("11000") || msg.contains("DuplicateKey") {
128 Ok(None)
129 } else {
130 Err(DataError::LockHeld(format!("mongo find_one_and_update failed: {e}")))
131 }
132 }
133 }
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use klauthed_core::id::Id;
141
142 async fn live_manager() -> MongoLockManager {
143 let url =
144 std::env::var("MONGODB_URL").unwrap_or_else(|_| "mongodb://127.0.0.1:27017".to_owned());
145 let client = mongodb::Client::with_uri_str(&url).await.expect("connect mongodb");
146 let db_name = format!("klauthed_test_{}", Id::<()>::new());
147 MongoLockManager::new(&client.database(&db_name))
148 }
149
150 #[tokio::test]
151 #[ignore = "requires a live MongoDB at MONGODB_URL"]
152 async fn acquire_blocks_and_releases() {
153 let locks = live_manager().await;
154 let key = format!("klauthed:test:lock:{}", LockToken::new());
155
156 let guard =
157 locks.acquire(&key, Duration::seconds(30)).await.unwrap().expect("first acquire wins");
158
159 assert!(locks.acquire(&key, Duration::seconds(30)).await.unwrap().is_none());
160
161 guard.release().await.unwrap();
162
163 assert!(locks.acquire(&key, Duration::seconds(30)).await.unwrap().is_some());
164 }
165}