1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant, SystemTime};
7use thiserror::Error;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11#[derive(Debug, Error, Clone, PartialEq, Eq)]
12#[error("{message}")]
13pub struct CoordinationError {
14 pub message: String,
15}
16
17impl CoordinationError {
18 pub fn new(message: impl Into<String>) -> Self {
19 Self {
20 message: message.into(),
21 }
22 }
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub struct CoordinationClaim {
27 pub claim_id: String,
28 pub trigger_key: String,
29 pub expires_at: SystemTime,
30}
31
32#[async_trait]
33pub trait CoordinationStore: Send + Sync {
34 async fn claim_trigger(
35 &self,
36 trigger_key: &str,
37 ttl: Duration,
38 ) -> Result<Option<CoordinationClaim>, CoordinationError>;
39
40 async fn renew_claim(
41 &self,
42 claim_id: &str,
43 ttl: Duration,
44 ) -> Result<Option<CoordinationClaim>, CoordinationError>;
45
46 async fn release_claim(&self, claim_id: &str) -> Result<(), CoordinationError>;
47
48 async fn scratchpad_get(
49 &self,
50 namespace: &str,
51 key: &str,
52 ) -> Result<Option<Value>, CoordinationError>;
53
54 async fn scratchpad_set(
55 &self,
56 namespace: &str,
57 key: &str,
58 value: Value,
59 ttl: Duration,
60 ) -> Result<(), CoordinationError>;
61}
62
63#[derive(Clone, Default)]
64pub struct InMemoryCoordinationStore {
65 claims: Arc<RwLock<HashMap<String, InMemoryClaim>>>,
66 scratchpad: Arc<RwLock<HashMap<(String, String), InMemoryScratchpadValue>>>,
67}
68
69#[derive(Clone)]
70struct InMemoryClaim {
71 claim_id: String,
72 expires_at: Instant,
73}
74
75#[derive(Clone)]
76struct InMemoryScratchpadValue {
77 value: Value,
78 expires_at: Instant,
79}
80
81impl InMemoryCoordinationStore {
82 pub fn new() -> Self {
83 Self::default()
84 }
85
86 async fn prune_expired(&self) {
87 let now = Instant::now();
88 self.claims
89 .write()
90 .await
91 .retain(|_, claim| claim.expires_at > now);
92 self.scratchpad
93 .write()
94 .await
95 .retain(|_, entry| entry.expires_at > now);
96 }
97}
98
99#[async_trait]
100impl CoordinationStore for InMemoryCoordinationStore {
101 async fn claim_trigger(
102 &self,
103 trigger_key: &str,
104 ttl: Duration,
105 ) -> Result<Option<CoordinationClaim>, CoordinationError> {
106 self.prune_expired().await;
107 let mut claims = self.claims.write().await;
108 if claims.contains_key(trigger_key) {
109 return Ok(None);
110 }
111 let claim_id = Uuid::new_v4().to_string();
112 let expires_at = Instant::now() + ttl;
113 claims.insert(
114 trigger_key.to_string(),
115 InMemoryClaim {
116 claim_id: claim_id.clone(),
117 expires_at,
118 },
119 );
120 Ok(Some(CoordinationClaim {
121 claim_id,
122 trigger_key: trigger_key.to_string(),
123 expires_at: SystemTime::now() + ttl,
124 }))
125 }
126
127 async fn renew_claim(
128 &self,
129 claim_id: &str,
130 ttl: Duration,
131 ) -> Result<Option<CoordinationClaim>, CoordinationError> {
132 self.prune_expired().await;
133 let mut claims = self.claims.write().await;
134 let Some((trigger_key, claim)) = claims
135 .iter_mut()
136 .find(|(_, claim)| claim.claim_id == claim_id)
137 else {
138 return Ok(None);
139 };
140 claim.expires_at = Instant::now() + ttl;
141 Ok(Some(CoordinationClaim {
142 claim_id: claim_id.to_string(),
143 trigger_key: trigger_key.clone(),
144 expires_at: SystemTime::now() + ttl,
145 }))
146 }
147
148 async fn release_claim(&self, claim_id: &str) -> Result<(), CoordinationError> {
149 self.prune_expired().await;
150 let mut claims = self.claims.write().await;
151 if let Some(key) = claims
152 .iter()
153 .find_map(|(key, claim)| (claim.claim_id == claim_id).then_some(key.clone()))
154 {
155 claims.remove(&key);
156 }
157 Ok(())
158 }
159
160 async fn scratchpad_get(
161 &self,
162 namespace: &str,
163 key: &str,
164 ) -> Result<Option<Value>, CoordinationError> {
165 self.prune_expired().await;
166 Ok(self
167 .scratchpad
168 .read()
169 .await
170 .get(&(namespace.to_string(), key.to_string()))
171 .map(|entry| entry.value.clone()))
172 }
173
174 async fn scratchpad_set(
175 &self,
176 namespace: &str,
177 key: &str,
178 value: Value,
179 ttl: Duration,
180 ) -> Result<(), CoordinationError> {
181 self.prune_expired().await;
182 self.scratchpad.write().await.insert(
183 (namespace.to_string(), key.to_string()),
184 InMemoryScratchpadValue {
185 value,
186 expires_at: Instant::now() + ttl,
187 },
188 );
189 Ok(())
190 }
191}