Skip to main content

hyperi_rustlib/worker/engine/
intern.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/intern.rs
3// Purpose:   Concurrent field name interning for the batch processing engine
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Field name interning for the batch processing engine.
10//!
11//! Deduplicates field name strings across an entire batch. The first occurrence
12//! of a field name allocates an `Arc<str>`; all subsequent occurrences get a
13//! cheap `Arc::clone` (~2 ns). Thread-safe via `DashMap` -- safe for concurrent
14//! access from rayon worker threads.
15
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use dashmap::DashMap;
20use sonic_rs::JsonContainerTrait as _;
21
22/// Concurrent field name interner.
23///
24/// Pre-populate with `with_known_fields` to amortise the cost of the first
25/// `intern` call for the hot-path fields (e.g. `_table`, `_timestamp`).
26///
27/// # Thread safety
28///
29/// `DashMap` shards the hash map into multiple segments; concurrent
30/// reads and writes from rayon worker threads are safe without external locking.
31pub struct FieldInterner {
32    table: DashMap<Arc<str>, ()>,
33}
34
35impl FieldInterner {
36    /// Create a new, empty interner.
37    #[must_use]
38    pub fn new() -> Self {
39        Self {
40            table: DashMap::new(),
41        }
42    }
43
44    /// Create an interner pre-populated with the given field names.
45    ///
46    /// Callers should pass the `known_fields` from [`super::config::BatchProcessingConfig`]
47    /// so that common fields never hit the slow-path allocation during a batch.
48    #[must_use]
49    pub fn with_known_fields(fields: &[&str]) -> Self {
50        let interner = Self::new();
51        for f in fields {
52            let _ = interner.intern(f);
53        }
54        interner
55    }
56
57    /// Intern a field name and return a shared `Arc<str>`.
58    ///
59    /// # Cost model
60    ///
61    /// - Fast path (already interned): one DashMap read-lock shard + `Arc::clone` → ~20 ns
62    /// - Slow path (first occurrence): one write to DashMap + `Arc::from` allocation → ~100 ns
63    ///
64    /// The slow path is taken at most once per unique field name per `FieldInterner` instance.
65    #[inline]
66    #[must_use]
67    pub fn intern(&self, name: &str) -> Arc<str> {
68        // Fast path: field already interned -- borrow the existing Arc.
69        // Arc<str>: Borrow<str> is in std, so DashMap::get accepts &str directly.
70        if let Some(entry) = self.table.get(name) {
71            return Arc::clone(entry.key());
72        }
73
74        // Slow path: first occurrence -- allocate and insert.
75        let key: Arc<str> = Arc::from(name);
76        self.table.entry(Arc::clone(&key)).or_insert(());
77
78        // Re-read to handle the (rare) concurrent-insert race: two threads may
79        // both miss the fast path and both try to insert. The one that wins
80        // the DashMap shard lock stores its key; the loser's key is dropped.
81        // We always return the canonical key that is present in the map.
82        if let Some(entry) = self.table.get(name) {
83            Arc::clone(entry.key())
84        } else {
85            // Extremely unlikely: the entry we just inserted is somehow gone
86            // (shouldn't happen without external removal). Return our key.
87            key
88        }
89    }
90
91    /// Extract known (pre-interned) fields from a parsed `sonic_rs::Value`.
92    ///
93    /// Iterates the top-level object keys and returns only those that are
94    /// already interned. O(known_fields × object_keys) -- typically
95    /// 6 known × 15 keys = 90 string comparisons per message.
96    ///
97    /// Returns an empty map if `value` is not a JSON object.
98    #[must_use]
99    pub fn extract_known(&self, value: &sonic_rs::Value) -> HashMap<Arc<str>, sonic_rs::Value> {
100        let mut extracted = HashMap::new();
101        if let Some(obj) = value.as_object() {
102            for (key, val) in obj {
103                if let Some(entry) = self.table.get(key) {
104                    let v: sonic_rs::Value = val.clone();
105                    extracted.insert(Arc::clone(entry.key()), v);
106                }
107            }
108        }
109        extracted
110    }
111
112    /// Return the number of interned field names.
113    #[must_use]
114    pub fn len(&self) -> usize {
115        self.table.len()
116    }
117
118    /// Return `true` if no field names have been interned yet.
119    #[must_use]
120    pub fn is_empty(&self) -> bool {
121        self.table.is_empty()
122    }
123}
124
125impl Default for FieldInterner {
126    fn default() -> Self {
127        Self::new()
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use std::thread;
134
135    use sonic_rs::JsonValueTrait as _;
136
137    use super::*;
138
139    #[test]
140    fn intern_returns_same_arc_for_same_string() {
141        let interner = FieldInterner::new();
142        let a = interner.intern("_table");
143        let b = interner.intern("_table");
144        assert!(Arc::ptr_eq(&a, &b), "expected same Arc for '_table'");
145    }
146
147    #[test]
148    fn intern_returns_different_arcs_for_different_strings() {
149        let interner = FieldInterner::new();
150        let a = interner.intern("_table");
151        let b = interner.intern("_timestamp");
152        assert!(!Arc::ptr_eq(&a, &b));
153        assert_eq!(a.as_ref(), "_table");
154        assert_eq!(b.as_ref(), "_timestamp");
155    }
156
157    #[test]
158    fn with_known_fields_prepopulates_table() {
159        let fields = ["_table", "_timestamp", "host"];
160        let interner = FieldInterner::with_known_fields(&fields);
161        assert_eq!(interner.len(), 3);
162
163        // Subsequent intern calls return the same Arc (pointer equality).
164        let a = interner.intern("_table");
165        let b = interner.intern("_table");
166        assert!(Arc::ptr_eq(&a, &b));
167    }
168
169    #[test]
170    fn extract_known_extracts_matching_fields() {
171        let interner = FieldInterner::with_known_fields(&["_table", "host"]);
172        let value: sonic_rs::Value =
173            sonic_rs::from_str(r#"{"_table": "events", "host": "web1", "unknown": 42}"#).unwrap();
174
175        let extracted = interner.extract_known(&value);
176
177        assert_eq!(extracted.len(), 2);
178
179        // Verify extracted values.
180        let table_key: Arc<str> = interner.intern("_table");
181        let host_key: Arc<str> = interner.intern("host");
182        assert_eq!(
183            extracted.get(&table_key).and_then(|v| v.as_str()),
184            Some("events")
185        );
186        assert_eq!(
187            extracted.get(&host_key).and_then(|v| v.as_str()),
188            Some("web1")
189        );
190        // Unknown field was not extracted.
191        let unknown_key: Arc<str> = Arc::from("unknown");
192        assert!(!extracted.contains_key(&unknown_key));
193    }
194
195    #[test]
196    fn extract_known_ignores_unknown_fields() {
197        let interner = FieldInterner::with_known_fields(&["_table"]);
198        let value: sonic_rs::Value = sonic_rs::from_str(r#"{"foo": 1, "bar": 2}"#).unwrap();
199
200        let extracted = interner.extract_known(&value);
201        assert!(extracted.is_empty(), "no known fields should be extracted");
202    }
203
204    #[test]
205    fn extract_known_on_non_object_returns_empty() {
206        let interner = FieldInterner::with_known_fields(&["_table"]);
207        let value: sonic_rs::Value = sonic_rs::from_str("[1, 2, 3]").unwrap();
208        let extracted = interner.extract_known(&value);
209        assert!(extracted.is_empty());
210    }
211
212    #[test]
213    fn concurrent_interning_deduplicates_correctly() {
214        use std::sync::Arc as StdArc;
215
216        let interner = StdArc::new(FieldInterner::new());
217        let field = "_table";
218        let num_threads = 8;
219
220        let handles: Vec<_> = (0..num_threads)
221            .map(|_| {
222                let interner = StdArc::clone(&interner);
223                thread::spawn(move || interner.intern(field))
224            })
225            .collect();
226
227        let arcs: Vec<Arc<str>> = handles.into_iter().map(|h| h.join().unwrap()).collect();
228
229        // All threads must have received the same Arc (pointer equality).
230        let first = &arcs[0];
231        for arc in &arcs[1..] {
232            assert!(
233                Arc::ptr_eq(first, arc),
234                "concurrent interning produced different Arc instances"
235            );
236        }
237
238        // Only one entry should be in the table.
239        assert_eq!(interner.len(), 1);
240    }
241
242    #[test]
243    fn len_and_is_empty() {
244        let interner = FieldInterner::new();
245        assert!(interner.is_empty());
246        let _ = interner.intern("a");
247        assert_eq!(interner.len(), 1);
248        assert!(!interner.is_empty());
249        let _ = interner.intern("b");
250        assert_eq!(interner.len(), 2);
251        // Repeated intern of existing key does not grow the table.
252        let _ = interner.intern("a");
253        assert_eq!(interner.len(), 2);
254    }
255}