Skip to main content

hyperi_rustlib/worker/engine/
intern.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/intern.rs
3// Purpose:   Concurrent field name interning for the batch processing engine
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Field name interning for the batch processing engine.
10//!
11//! Deduplicates field name strings across an entire batch. The first occurrence
12//! of a field name allocates an `Arc<str>`; all subsequent occurrences get a
13//! cheap `Arc::clone` (~2 ns). Thread-safe via `DashMap` -- safe for concurrent
14//! access from rayon worker threads.
15
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use dashmap::DashMap;
20use sonic_rs::JsonContainerTrait as _;
21
22/// Concurrent field name interner.
23///
24/// Pre-populate with `with_known_fields` to amortise the cost of the first
25/// `intern` call for the hot-path fields (e.g. `_table`, `_timestamp`).
26///
27/// # Thread safety
28///
29/// `DashMap` shards the hash map into multiple segments; concurrent
30/// reads and writes from rayon worker threads are safe without external locking.
31pub struct FieldInterner {
32    table: DashMap<Arc<str>, ()>,
33}
34
35impl FieldInterner {
36    /// Create a new, empty interner.
37    #[must_use]
38    pub fn new() -> Self {
39        Self {
40            table: DashMap::new(),
41        }
42    }
43
44    /// Create an interner pre-populated with the given field names.
45    ///
46    /// Callers should pass the `known_fields` from [`super::config::BatchProcessingConfig`]
47    /// so that common fields never hit the slow-path allocation during a batch.
48    #[must_use]
49    pub fn with_known_fields(fields: &[&str]) -> Self {
50        let interner = Self::new();
51        for f in fields {
52            let _ = interner.intern(f);
53        }
54        interner
55    }
56
57    /// Intern a field name and return a shared `Arc<str>`.
58    ///
59    /// Fast path (already interned): DashMap shard read + `Arc::clone`, ~20 ns.
60    /// Slow path (first occurrence): DashMap write + `Arc::from` allocation,
61    /// ~100 ns -- taken at most once per unique field name per instance.
62    #[inline]
63    #[must_use]
64    pub fn intern(&self, name: &str) -> Arc<str> {
65        // Fast path: field already interned -- borrow the existing Arc.
66        // Arc<str>: Borrow<str> is in std, so DashMap::get accepts &str directly.
67        if let Some(entry) = self.table.get(name) {
68            return Arc::clone(entry.key());
69        }
70
71        // Slow path: first occurrence -- allocate and insert.
72        let key: Arc<str> = Arc::from(name);
73        self.table.entry(Arc::clone(&key)).or_insert(());
74
75        // Re-read to handle the (rare) concurrent-insert race: two threads may
76        // both miss the fast path and both try to insert. The one that wins
77        // the DashMap shard lock stores its key; the loser's key is dropped.
78        // We always return the canonical key that is present in the map.
79        if let Some(entry) = self.table.get(name) {
80            Arc::clone(entry.key())
81        } else {
82            // Extremely unlikely: the entry we just inserted is somehow gone
83            // (shouldn't happen without external removal). Return our key.
84            key
85        }
86    }
87
88    /// Extract known (pre-interned) fields from a parsed `sonic_rs::Value`.
89    ///
90    /// Iterates the top-level object keys and returns only those that are
91    /// already interned. O(known_fields x object_keys) -- typically
92    /// 6 known x 15 keys = 90 string comparisons per message.
93    ///
94    /// Returns an empty map if `value` is not a JSON object.
95    #[must_use]
96    pub fn extract_known(&self, value: &sonic_rs::Value) -> HashMap<Arc<str>, sonic_rs::Value> {
97        let mut extracted = HashMap::new();
98        if let Some(obj) = value.as_object() {
99            for (key, val) in obj {
100                if let Some(entry) = self.table.get(key) {
101                    let v: sonic_rs::Value = val.clone();
102                    extracted.insert(Arc::clone(entry.key()), v);
103                }
104            }
105        }
106        extracted
107    }
108
109    /// Return the number of interned field names.
110    #[must_use]
111    pub fn len(&self) -> usize {
112        self.table.len()
113    }
114
115    /// Return `true` if no field names have been interned yet.
116    #[must_use]
117    pub fn is_empty(&self) -> bool {
118        self.table.is_empty()
119    }
120}
121
122impl Default for FieldInterner {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use std::thread;
131
132    use sonic_rs::JsonValueTrait as _;
133
134    use super::*;
135
136    #[test]
137    fn intern_returns_same_arc_for_same_string() {
138        let interner = FieldInterner::new();
139        let a = interner.intern("_table");
140        let b = interner.intern("_table");
141        assert!(Arc::ptr_eq(&a, &b), "expected same Arc for '_table'");
142    }
143
144    #[test]
145    fn intern_returns_different_arcs_for_different_strings() {
146        let interner = FieldInterner::new();
147        let a = interner.intern("_table");
148        let b = interner.intern("_timestamp");
149        assert!(!Arc::ptr_eq(&a, &b));
150        assert_eq!(a.as_ref(), "_table");
151        assert_eq!(b.as_ref(), "_timestamp");
152    }
153
154    #[test]
155    fn with_known_fields_prepopulates_table() {
156        let fields = ["_table", "_timestamp", "host"];
157        let interner = FieldInterner::with_known_fields(&fields);
158        assert_eq!(interner.len(), 3);
159
160        // Subsequent intern calls return the same Arc (pointer equality).
161        let a = interner.intern("_table");
162        let b = interner.intern("_table");
163        assert!(Arc::ptr_eq(&a, &b));
164    }
165
166    #[test]
167    fn extract_known_extracts_matching_fields() {
168        let interner = FieldInterner::with_known_fields(&["_table", "host"]);
169        let value: sonic_rs::Value =
170            sonic_rs::from_str(r#"{"_table": "events", "host": "web1", "unknown": 42}"#).unwrap();
171
172        let extracted = interner.extract_known(&value);
173
174        assert_eq!(extracted.len(), 2);
175
176        // Verify extracted values.
177        let table_key: Arc<str> = interner.intern("_table");
178        let host_key: Arc<str> = interner.intern("host");
179        assert_eq!(
180            extracted.get(&table_key).and_then(|v| v.as_str()),
181            Some("events")
182        );
183        assert_eq!(
184            extracted.get(&host_key).and_then(|v| v.as_str()),
185            Some("web1")
186        );
187        // Unknown field was not extracted.
188        let unknown_key: Arc<str> = Arc::from("unknown");
189        assert!(!extracted.contains_key(&unknown_key));
190    }
191
192    #[test]
193    fn extract_known_ignores_unknown_fields() {
194        let interner = FieldInterner::with_known_fields(&["_table"]);
195        let value: sonic_rs::Value = sonic_rs::from_str(r#"{"foo": 1, "bar": 2}"#).unwrap();
196
197        let extracted = interner.extract_known(&value);
198        assert!(extracted.is_empty(), "no known fields should be extracted");
199    }
200
201    #[test]
202    fn extract_known_on_non_object_returns_empty() {
203        let interner = FieldInterner::with_known_fields(&["_table"]);
204        let value: sonic_rs::Value = sonic_rs::from_str("[1, 2, 3]").unwrap();
205        let extracted = interner.extract_known(&value);
206        assert!(extracted.is_empty());
207    }
208
209    #[test]
210    fn concurrent_interning_deduplicates_correctly() {
211        use std::sync::Arc as StdArc;
212
213        let interner = StdArc::new(FieldInterner::new());
214        let field = "_table";
215        let num_threads = 8;
216
217        let handles: Vec<_> = (0..num_threads)
218            .map(|_| {
219                let interner = StdArc::clone(&interner);
220                thread::spawn(move || interner.intern(field))
221            })
222            .collect();
223
224        let arcs: Vec<Arc<str>> = handles.into_iter().map(|h| h.join().unwrap()).collect();
225
226        // All threads must have received the same Arc (pointer equality).
227        let first = &arcs[0];
228        for arc in &arcs[1..] {
229            assert!(
230                Arc::ptr_eq(first, arc),
231                "concurrent interning produced different Arc instances"
232            );
233        }
234
235        // Only one entry should be in the table.
236        assert_eq!(interner.len(), 1);
237    }
238
239    #[test]
240    fn len_and_is_empty() {
241        let interner = FieldInterner::new();
242        assert!(interner.is_empty());
243        let _ = interner.intern("a");
244        assert_eq!(interner.len(), 1);
245        assert!(!interner.is_empty());
246        let _ = interner.intern("b");
247        assert_eq!(interner.len(), 2);
248        // Repeated intern of existing key does not grow the table.
249        let _ = interner.intern("a");
250        assert_eq!(interner.len(), 2);
251    }
252}