Skip to main content

rain_engine_core/
coordination.rs

1use async_trait::async_trait;
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{Duration, Instant, SystemTime};
7use thiserror::Error;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11#[derive(Debug, Error, Clone, PartialEq, Eq)]
12#[error("{message}")]
13pub struct CoordinationError {
14    pub message: String,
15}
16
17impl CoordinationError {
18    pub fn new(message: impl Into<String>) -> Self {
19        Self {
20            message: message.into(),
21        }
22    }
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub struct CoordinationClaim {
27    pub claim_id: String,
28    pub trigger_key: String,
29    pub expires_at: SystemTime,
30}
31
32#[async_trait]
33pub trait CoordinationStore: Send + Sync {
34    async fn claim_trigger(
35        &self,
36        trigger_key: &str,
37        ttl: Duration,
38    ) -> Result<Option<CoordinationClaim>, CoordinationError>;
39
40    async fn renew_claim(
41        &self,
42        claim_id: &str,
43        ttl: Duration,
44    ) -> Result<Option<CoordinationClaim>, CoordinationError>;
45
46    async fn release_claim(&self, claim_id: &str) -> Result<(), CoordinationError>;
47
48    async fn scratchpad_get(
49        &self,
50        namespace: &str,
51        key: &str,
52    ) -> Result<Option<Value>, CoordinationError>;
53
54    async fn scratchpad_set(
55        &self,
56        namespace: &str,
57        key: &str,
58        value: Value,
59        ttl: Duration,
60    ) -> Result<(), CoordinationError>;
61}
62
63#[derive(Clone, Default)]
64pub struct InMemoryCoordinationStore {
65    claims: Arc<RwLock<HashMap<String, InMemoryClaim>>>,
66    scratchpad: Arc<RwLock<HashMap<(String, String), InMemoryScratchpadValue>>>,
67}
68
69#[derive(Clone)]
70struct InMemoryClaim {
71    claim_id: String,
72    expires_at: Instant,
73}
74
75#[derive(Clone)]
76struct InMemoryScratchpadValue {
77    value: Value,
78    expires_at: Instant,
79}
80
81impl InMemoryCoordinationStore {
82    pub fn new() -> Self {
83        Self::default()
84    }
85
86    async fn prune_expired(&self) {
87        let now = Instant::now();
88        self.claims
89            .write()
90            .await
91            .retain(|_, claim| claim.expires_at > now);
92        self.scratchpad
93            .write()
94            .await
95            .retain(|_, entry| entry.expires_at > now);
96    }
97}
98
99#[async_trait]
100impl CoordinationStore for InMemoryCoordinationStore {
101    async fn claim_trigger(
102        &self,
103        trigger_key: &str,
104        ttl: Duration,
105    ) -> Result<Option<CoordinationClaim>, CoordinationError> {
106        self.prune_expired().await;
107        let mut claims = self.claims.write().await;
108        if claims.contains_key(trigger_key) {
109            return Ok(None);
110        }
111        let claim_id = Uuid::new_v4().to_string();
112        let expires_at = Instant::now() + ttl;
113        claims.insert(
114            trigger_key.to_string(),
115            InMemoryClaim {
116                claim_id: claim_id.clone(),
117                expires_at,
118            },
119        );
120        Ok(Some(CoordinationClaim {
121            claim_id,
122            trigger_key: trigger_key.to_string(),
123            expires_at: SystemTime::now() + ttl,
124        }))
125    }
126
127    async fn renew_claim(
128        &self,
129        claim_id: &str,
130        ttl: Duration,
131    ) -> Result<Option<CoordinationClaim>, CoordinationError> {
132        self.prune_expired().await;
133        let mut claims = self.claims.write().await;
134        let Some((trigger_key, claim)) = claims
135            .iter_mut()
136            .find(|(_, claim)| claim.claim_id == claim_id)
137        else {
138            return Ok(None);
139        };
140        claim.expires_at = Instant::now() + ttl;
141        Ok(Some(CoordinationClaim {
142            claim_id: claim_id.to_string(),
143            trigger_key: trigger_key.clone(),
144            expires_at: SystemTime::now() + ttl,
145        }))
146    }
147
148    async fn release_claim(&self, claim_id: &str) -> Result<(), CoordinationError> {
149        self.prune_expired().await;
150        let mut claims = self.claims.write().await;
151        if let Some(key) = claims
152            .iter()
153            .find_map(|(key, claim)| (claim.claim_id == claim_id).then_some(key.clone()))
154        {
155            claims.remove(&key);
156        }
157        Ok(())
158    }
159
160    async fn scratchpad_get(
161        &self,
162        namespace: &str,
163        key: &str,
164    ) -> Result<Option<Value>, CoordinationError> {
165        self.prune_expired().await;
166        Ok(self
167            .scratchpad
168            .read()
169            .await
170            .get(&(namespace.to_string(), key.to_string()))
171            .map(|entry| entry.value.clone()))
172    }
173
174    async fn scratchpad_set(
175        &self,
176        namespace: &str,
177        key: &str,
178        value: Value,
179        ttl: Duration,
180    ) -> Result<(), CoordinationError> {
181        self.prune_expired().await;
182        self.scratchpad.write().await.insert(
183            (namespace.to_string(), key.to_string()),
184            InMemoryScratchpadValue {
185                value,
186                expires_at: Instant::now() + ttl,
187            },
188        );
189        Ok(())
190    }
191}