Skip to main content

openlatch_client/daemon/
dedup.rs

1/// In-memory duplicate event detection store with a 100ms TTL.
2///
3/// Deduplicates events by hashing (session_id + tool_name + canonical JSON of tool_input).
4/// Duplicate events within the TTL window return the same verdict but are NOT logged,
5/// preventing log spam from repeated hook calls within a single agent operation.
6///
7/// # Architecture
8///
9/// Uses `DashMap` for lock-free concurrent access. The hash key is a `u64` computed by
10/// SipHash (via `DefaultHasher`). SipHash is adequate here — this is an internal dedup
11/// mechanism, not a security hash. Collisions within a 100ms window are acceptable:
12/// a false-positive dedup causes a single event to be skipped, not a security failure.
13///
14/// # Performance
15///
16/// PERFORMANCE: `DashMap` provides O(1) average-case reads/writes with no global lock.
17/// The 100ms TTL window is small enough that the map stays tiny in practice.
18use dashmap::DashMap;
19use std::collections::hash_map::DefaultHasher;
20use std::hash::{Hash, Hasher};
21use std::time::{Duration, Instant};
22
23/// Deduplication TTL window per EVNT-06.
24const DEDUP_TTL: Duration = Duration::from_millis(100);
25
26/// In-memory store for deduplicating events within a configurable TTL window.
27///
28/// Thread-safe via `DashMap`. All methods take `&self` (no exclusive lock needed).
29pub struct DedupStore {
30    inner: DashMap<u64, Instant>,
31    ttl: Duration,
32}
33
34impl DedupStore {
35    /// Create a new `DedupStore` with the default 100ms TTL.
36    pub fn new() -> Self {
37        Self {
38            inner: DashMap::new(),
39            ttl: DEDUP_TTL,
40        }
41    }
42
43    /// Check if this event is a duplicate and insert it if it is not.
44    ///
45    /// Returns `true` if the event was seen within the TTL window (it IS a duplicate).
46    /// Returns `false` if the event is new and inserts it for future dedup checks.
47    ///
48    /// The dedup key is: SHA-like hash of `(session_id, tool_name, canonical_json(tool_input))`.
49    pub fn check_and_insert(
50        &self,
51        session_id: &str,
52        tool_name: &str,
53        tool_input: &serde_json::Value,
54    ) -> bool {
55        let key = self.compute_hash(session_id, tool_name, tool_input);
56        let now = Instant::now();
57
58        if let Some(entry) = self.inner.get(&key) {
59            if now.duration_since(*entry.value()) < self.ttl {
60                return true; // duplicate within TTL
61            }
62        }
63        self.inner.insert(key, now);
64        false
65    }
66
67    /// Compute a dedup key by hashing the event's identity fields.
68    ///
69    /// Uses `DefaultHasher` (SipHash) — fast and sufficient for collision avoidance
70    /// within a 100ms window. This is NOT a security hash.
71    ///
72    /// Tool input is hashed directly from the JSON Value tree with keys sorted at each
73    /// level, ensuring deterministic hashing regardless of JSON key order in the payload.
74    /// This avoids the allocations of serializing to a canonical JSON string.
75    fn compute_hash(
76        &self,
77        session_id: &str,
78        tool_name: &str,
79        tool_input: &serde_json::Value,
80    ) -> u64 {
81        let mut hasher = DefaultHasher::new();
82        session_id.hash(&mut hasher);
83        tool_name.hash(&mut hasher);
84        hash_value(tool_input, &mut hasher);
85        hasher.finish()
86    }
87
88    /// Evict all entries older than the TTL.
89    ///
90    /// Call periodically to prevent unbounded memory growth in long-running sessions.
91    /// Under normal load (100ms TTL, <1000 events/session) the map stays very small
92    /// and eviction is low-priority.
93    pub fn evict_expired(&self) {
94        let now = Instant::now();
95        self.inner.retain(|_, v| now.duration_since(*v) < self.ttl);
96    }
97}
98
99impl Default for DedupStore {
100    fn default() -> Self {
101        Self::new()
102    }
103}
104
105/// Hash a JSON value directly into the hasher with keys sorted at each nesting level.
106///
107/// This avoids allocating intermediate strings or BTreeMaps — the value tree is walked
108/// once, feeding bytes directly into the hasher. Type discriminant tags ensure that
109/// `"42"` (string) and `42` (number) produce different hashes.
110///
111/// Defensive measure: object keys are collected into a Vec and sorted before hashing,
112/// ensuring deterministic output even if `preserve_order` is enabled by a transitive dep.
113fn hash_value(value: &serde_json::Value, hasher: &mut impl Hasher) {
114    match value {
115        serde_json::Value::Null => 0u8.hash(hasher),
116        serde_json::Value::Bool(b) => {
117            1u8.hash(hasher);
118            b.hash(hasher);
119        }
120        serde_json::Value::Number(n) => {
121            2u8.hash(hasher);
122            // Hash the debug representation — covers i64, u64, f64 uniformly
123            format!("{n}").hash(hasher);
124        }
125        serde_json::Value::String(s) => {
126            3u8.hash(hasher);
127            s.hash(hasher);
128        }
129        serde_json::Value::Array(arr) => {
130            4u8.hash(hasher);
131            arr.len().hash(hasher);
132            for item in arr {
133                hash_value(item, hasher);
134            }
135        }
136        serde_json::Value::Object(map) => {
137            5u8.hash(hasher);
138            map.len().hash(hasher);
139            // Sort keys for deterministic hashing regardless of insertion order
140            let mut keys: Vec<&String> = map.keys().collect();
141            keys.sort();
142            for key in keys {
143                key.hash(hasher);
144                hash_value(&map[key], hasher);
145            }
146        }
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use serde_json::json;
154
155    #[test]
156    fn test_first_event_is_not_duplicate() {
157        let store = DedupStore::new();
158        let input = json!({"command": "ls -la"});
159
160        let is_dup = store.check_and_insert("session-1", "bash", &input);
161
162        assert!(!is_dup, "first occurrence must not be a duplicate");
163    }
164
165    #[test]
166    fn test_same_event_within_ttl_is_duplicate() {
167        let store = DedupStore::new();
168        let input = json!({"command": "ls -la"});
169
170        // First call — not a duplicate
171        let first = store.check_and_insert("session-1", "bash", &input);
172        // Second call immediately — within 100ms TTL
173        let second = store.check_and_insert("session-1", "bash", &input);
174
175        assert!(!first, "first occurrence must not be a duplicate");
176        assert!(second, "immediate repeat must be detected as duplicate");
177    }
178
179    #[test]
180    fn test_same_event_after_ttl_is_not_duplicate() {
181        let store = DedupStore::new();
182        let input = json!({"command": "ls -la"});
183
184        store.check_and_insert("session-1", "bash", &input);
185
186        // Wait beyond the 100ms TTL
187        std::thread::sleep(Duration::from_millis(150));
188
189        let after_ttl = store.check_and_insert("session-1", "bash", &input);
190        assert!(!after_ttl, "event after TTL expiry must not be a duplicate");
191    }
192
193    #[test]
194    fn test_different_events_are_not_duplicates() {
195        let store = DedupStore::new();
196
197        let first = store.check_and_insert("session-1", "bash", &json!({"cmd": "ls"}));
198        let second =
199            store.check_and_insert("session-1", "read_file", &json!({"path": "/etc/hosts"}));
200
201        assert!(!first, "first event must not be a duplicate");
202        assert!(!second, "different event must not be a duplicate");
203    }
204
205    #[test]
206    fn test_different_sessions_same_tool_not_duplicate() {
207        let store = DedupStore::new();
208        let input = json!({"command": "ls"});
209
210        store.check_and_insert("session-A", "bash", &input);
211        let second = store.check_and_insert("session-B", "bash", &input);
212
213        assert!(
214            !second,
215            "same tool call in different session must not be a duplicate"
216        );
217    }
218
219    #[test]
220    fn test_evict_expired_removes_old_entries() {
221        let store = DedupStore::new();
222        let input = json!({"key": "value"});
223
224        store.check_and_insert("session-1", "bash", &input);
225
226        // Confirm it's in the map (next call is a duplicate)
227        assert!(
228            store.check_and_insert("session-1", "bash", &input),
229            "entry must exist before eviction"
230        );
231
232        // Wait for TTL to expire, then evict
233        std::thread::sleep(Duration::from_millis(150));
234        store.evict_expired();
235
236        // After eviction, the same event is no longer a duplicate
237        let after_evict = store.check_and_insert("session-1", "bash", &input);
238        assert!(
239            !after_evict,
240            "evicted entry must not be a duplicate on next check"
241        );
242    }
243
244    #[test]
245    fn test_dedup_key_order_independent() {
246        // Two JSON objects with the same keys in different insertion order
247        // must produce the same dedup hash (canonical JSON sorts keys).
248        let store = DedupStore::new();
249
250        // Manually construct objects with different key order using serde_json::Map
251        let mut map_a = serde_json::Map::new();
252        map_a.insert("command".to_string(), json!("ls -la"));
253        map_a.insert("path".to_string(), json!("/tmp"));
254        let input_a = serde_json::Value::Object(map_a);
255
256        let mut map_b = serde_json::Map::new();
257        map_b.insert("path".to_string(), json!("/tmp"));
258        map_b.insert("command".to_string(), json!("ls -la"));
259        let input_b = serde_json::Value::Object(map_b);
260
261        // First call with key order A
262        let first = store.check_and_insert("session-1", "bash", &input_a);
263        // Second call with key order B — must be detected as duplicate
264        let second = store.check_and_insert("session-1", "bash", &input_b);
265
266        assert!(!first, "first occurrence must not be a duplicate");
267        assert!(
268            second,
269            "same logical event with different key order must be detected as duplicate"
270        );
271    }
272
273    #[test]
274    fn test_hash_value_sorts_nested_keys() {
275        // Two objects with identical content but different key order must hash the same
276        let input_a = json!({"z": {"b": 2, "a": 1}, "a": [{"y": 1, "x": 2}]});
277        let input_b = json!({"a": [{"x": 2, "y": 1}], "z": {"a": 1, "b": 2}});
278
279        let mut hasher_a = DefaultHasher::new();
280        let mut hasher_b = DefaultHasher::new();
281        super::hash_value(&input_a, &mut hasher_a);
282        super::hash_value(&input_b, &mut hasher_b);
283
284        assert_eq!(
285            hasher_a.finish(),
286            hasher_b.finish(),
287            "nested keys in different order must produce the same hash"
288        );
289    }
290}