Skip to main content

laminar_db/
catalog.rs

1//! Source and sink catalog for tracking registered streaming objects.
2#![allow(clippy::disallowed_types)] // cold path
3
4use std::collections::HashMap;
5use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8
9use arrow::array::RecordBatch;
10use arrow::datatypes::SchemaRef;
11use parking_lot::RwLock;
12use tokio::sync::Notify;
13
14use laminar_core::streaming::{self, BackpressureStrategy, SourceConfig, WaitStrategy};
15
16/// Internal record type for untyped sources (stores raw `RecordBatch`).
17#[derive(Clone, Debug)]
18pub(crate) struct ArrowRecord {
19    /// The record batch.
20    pub(crate) batch: RecordBatch,
21}
22
23impl laminar_core::streaming::Record for ArrowRecord {
24    fn schema() -> SchemaRef {
25        // This is a placeholder; the actual schema is on the SourceEntry.
26        // ArrowRecord is only used as a type parameter; push_arrow bypasses this.
27        Arc::new(arrow::datatypes::Schema::empty())
28    }
29
30    fn to_record_batch(&self) -> RecordBatch {
31        self.batch.clone()
32    }
33}
34
35/// Bounded ring buffer for snapshot batches.
36///
37/// Uses an atomic tail counter (`fetch_add`) so concurrent `push()`
38/// calls from multiple threads each get a unique slot — no lost writes.
39/// Per-slot `parking_lot::Mutex` protects the actual slot write/read.
40struct SnapshotRing {
41    slots: Box<[parking_lot::Mutex<Option<RecordBatch>>]>,
42    /// Monotonically increasing write counter. `tail % capacity` = next slot.
43    tail: AtomicUsize,
44    capacity: usize,
45}
46
47impl SnapshotRing {
48    fn new(capacity: usize) -> Self {
49        let cap = capacity.max(1);
50        let slots: Vec<_> = (0..cap).map(|_| parking_lot::Mutex::new(None)).collect();
51        Self {
52            slots: slots.into_boxed_slice(),
53            tail: AtomicUsize::new(0),
54            capacity: cap,
55        }
56    }
57
58    fn push(&self, batch: RecordBatch) {
59        // fetch_add is atomic — concurrent pushers each get a unique slot.
60        let idx = self.tail.fetch_add(1, Ordering::Relaxed) % self.capacity;
61        *self.slots[idx].lock() = Some(batch);
62    }
63
64    fn snapshot(&self) -> Vec<RecordBatch> {
65        let tail = self.tail.load(Ordering::Acquire);
66        let count = tail.min(self.capacity);
67        // Read the most recent `count` slots, oldest first.
68        let start = if tail <= self.capacity {
69            0
70        } else {
71            tail % self.capacity
72        };
73        let mut result = Vec::with_capacity(count);
74        for i in 0..count {
75            let idx = (start + i) % self.capacity;
76            if let Some(batch) = self.slots[idx].lock().as_ref() {
77                result.push(batch.clone());
78            }
79        }
80        result
81    }
82}
83
84/// A registered source in the catalog.
85pub struct SourceEntry {
86    /// Source name.
87    pub name: String,
88    /// Arrow schema.
89    pub schema: SchemaRef,
90    /// Watermark column name, if configured.
91    pub watermark_column: Option<String>,
92    /// Maximum out-of-orderness for watermark generation.
93    pub max_out_of_orderness: Option<Duration>,
94    /// Whether this source uses processing-time watermarks (`PROCTIME()`).
95    pub is_processing_time: std::sync::atomic::AtomicBool,
96    /// The underlying streaming source (type-erased via `ArrowRecord`).
97    pub(crate) source: streaming::Source<ArrowRecord>,
98    /// The underlying streaming sink (type-erased via `ArrowRecord`).
99    pub(crate) sink: streaming::Sink<ArrowRecord>,
100    /// Lock-free bounded ring buffer for ad-hoc snapshot queries.
101    buffer: SnapshotRing,
102    /// Notification handle for event-driven wakeup on `db.insert()`.
103    data_notify: Arc<Notify>,
104}
105
106impl SourceEntry {
107    /// Push a batch to both the SPSC channel and the snapshot buffer.
108    ///
109    /// The snapshot buffer is bounded — oldest batches are dropped when
110    /// capacity is exceeded. The SPSC push is the primary delivery path;
111    /// the snapshot ring is only for ad-hoc queries.
112    pub(crate) fn push_and_buffer(
113        &self,
114        batch: RecordBatch,
115    ) -> Result<(), laminar_core::streaming::StreamingError> {
116        self.source.push_arrow(batch.clone())?;
117        self.buffer.push(batch);
118        // notify_one() stores a permit so the CatalogSourceConnector
119        // IO thread wakes immediately. Assumes exactly one IO thread per
120        // source — if multiple consumers are added, switch to notify_waiters().
121        self.data_notify.notify_one();
122        Ok(())
123    }
124
125    /// Return a snapshot of all buffered batches for ad-hoc queries.
126    pub(crate) fn snapshot(&self) -> Vec<RecordBatch> {
127        self.buffer.snapshot()
128    }
129
130    /// Get the notification handle for event-driven wakeup on data insertion.
131    pub(crate) fn data_notify(&self) -> Arc<Notify> {
132        Arc::clone(&self.data_notify)
133    }
134}
135
136/// A registered sink in the catalog.
137#[allow(dead_code)]
138pub(crate) struct SinkEntry {
139    /// Sink name.
140    pub(crate) name: String,
141    /// Input source or table name.
142    pub(crate) input: String,
143}
144
145/// A registered query.
146pub(crate) struct QueryEntry {
147    /// Query identifier.
148    pub(crate) id: u64,
149    /// Human-readable name or SQL text.
150    pub(crate) sql: String,
151    /// Whether the query is still active.
152    pub(crate) active: bool,
153}
154
155/// A registered stream in the catalog.
156#[allow(dead_code)]
157pub(crate) struct StreamEntry {
158    /// Stream name.
159    pub(crate) name: String,
160    /// The underlying streaming source (for pushing data into the stream).
161    pub(crate) source: streaming::Source<ArrowRecord>,
162    /// The underlying streaming sink (for subscribing to the stream).
163    pub(crate) sink: streaming::Sink<ArrowRecord>,
164}
165
166/// Catalog of registered sources, sinks, streams, and queries.
167pub struct SourceCatalog {
168    sources: RwLock<HashMap<String, Arc<SourceEntry>>>,
169    sinks: RwLock<HashMap<String, SinkEntry>>,
170    streams: RwLock<HashMap<String, Arc<StreamEntry>>>,
171    queries: RwLock<HashMap<u64, QueryEntry>>,
172    next_query_id: AtomicU64,
173    default_buffer_size: usize,
174    default_backpressure: BackpressureStrategy,
175}
176
177impl SourceCatalog {
178    /// Create a new empty catalog.
179    #[must_use]
180    pub fn new(buffer_size: usize, backpressure: BackpressureStrategy) -> Self {
181        Self {
182            sources: RwLock::new(HashMap::new()),
183            sinks: RwLock::new(HashMap::new()),
184            streams: RwLock::new(HashMap::new()),
185            queries: RwLock::new(HashMap::new()),
186            next_query_id: AtomicU64::new(1),
187            default_buffer_size: buffer_size,
188            default_backpressure: backpressure,
189        }
190    }
191
192    /// Register a source from a SQL CREATE SOURCE definition.
193    #[allow(clippy::too_many_arguments)]
194    pub(crate) fn register_source(
195        &self,
196        name: &str,
197        schema: SchemaRef,
198        watermark_column: Option<String>,
199        max_out_of_orderness: Option<Duration>,
200        buffer_size: Option<usize>,
201        backpressure: Option<BackpressureStrategy>,
202    ) -> Result<Arc<SourceEntry>, crate::DbError> {
203        let mut sources = self.sources.write();
204        if sources.contains_key(name) {
205            return Err(crate::DbError::SourceAlreadyExists(name.to_string()));
206        }
207
208        let buf_size = buffer_size.unwrap_or(self.default_buffer_size);
209        let bp = backpressure.unwrap_or(self.default_backpressure);
210
211        let config = SourceConfig {
212            channel: streaming::ChannelConfig {
213                buffer_size: buf_size,
214                backpressure: bp,
215                wait_strategy: WaitStrategy::SpinYield,
216                track_stats: false,
217            },
218            name: Some(name.to_string()),
219        };
220
221        let (source, sink) = streaming::create_with_config::<ArrowRecord>(config);
222
223        let entry = Arc::new(SourceEntry {
224            name: name.to_string(),
225            schema,
226            watermark_column,
227            max_out_of_orderness,
228            is_processing_time: std::sync::atomic::AtomicBool::new(false),
229            source,
230            sink,
231            buffer: SnapshotRing::new(buf_size),
232            data_notify: Arc::new(Notify::new()),
233        });
234
235        sources.insert(name.to_string(), Arc::clone(&entry));
236        Ok(entry)
237    }
238
239    /// Register a source, replacing if it already exists.
240    pub(crate) fn register_source_or_replace(
241        &self,
242        name: &str,
243        schema: SchemaRef,
244        watermark_column: Option<String>,
245        max_out_of_orderness: Option<Duration>,
246        buffer_size: Option<usize>,
247        backpressure: Option<BackpressureStrategy>,
248    ) -> Arc<SourceEntry> {
249        // Remove existing if present
250        self.sources.write().remove(name);
251        // Safe to unwrap since we just removed any conflict
252        self.register_source(
253            name,
254            schema,
255            watermark_column,
256            max_out_of_orderness,
257            buffer_size,
258            backpressure,
259        )
260        .unwrap()
261    }
262
263    /// Get a registered source by name.
264    pub fn get_source(&self, name: &str) -> Option<Arc<SourceEntry>> {
265        self.sources.read().get(name).cloned()
266    }
267
268    /// Remove a source by name.
269    pub fn drop_source(&self, name: &str) -> bool {
270        self.sources.write().remove(name).is_some()
271    }
272
273    /// Register a sink.
274    pub(crate) fn register_sink(&self, name: &str, input: &str) -> Result<(), crate::DbError> {
275        let mut sinks = self.sinks.write();
276        if sinks.contains_key(name) {
277            return Err(crate::DbError::SinkAlreadyExists(name.to_string()));
278        }
279        sinks.insert(
280            name.to_string(),
281            SinkEntry {
282                name: name.to_string(),
283                input: input.to_string(),
284            },
285        );
286        Ok(())
287    }
288
289    /// Remove a sink by name.
290    pub fn drop_sink(&self, name: &str) -> bool {
291        self.sinks.write().remove(name).is_some()
292    }
293
294    /// Register a named stream.
295    pub(crate) fn register_stream(&self, name: &str) -> Result<(), crate::DbError> {
296        let mut streams = self.streams.write();
297        if streams.contains_key(name) {
298            return Err(crate::DbError::StreamAlreadyExists(name.to_string()));
299        }
300
301        let config = SourceConfig {
302            channel: streaming::ChannelConfig {
303                buffer_size: self.default_buffer_size,
304                backpressure: self.default_backpressure,
305                wait_strategy: WaitStrategy::SpinYield,
306                track_stats: false,
307            },
308            name: Some(name.to_string()),
309        };
310
311        let (source, sink) = streaming::create_with_config::<ArrowRecord>(config);
312
313        streams.insert(
314            name.to_string(),
315            Arc::new(StreamEntry {
316                name: name.to_string(),
317                source,
318                sink,
319            }),
320        );
321        Ok(())
322    }
323
324    /// Get a subscription to a named stream.
325    pub(crate) fn get_stream_subscription(
326        &self,
327        name: &str,
328    ) -> Option<streaming::Subscription<ArrowRecord>> {
329        self.streams
330            .read()
331            .get(name)
332            .map(|entry| entry.sink.subscribe())
333    }
334
335    /// Get a stream entry by name.
336    pub(crate) fn get_stream_entry(&self, name: &str) -> Option<Arc<StreamEntry>> {
337        self.streams.read().get(name).cloned()
338    }
339
340    /// Get a clone of the stream's source handle (for pushing results).
341    pub(crate) fn get_stream_source(&self, name: &str) -> Option<streaming::Source<ArrowRecord>> {
342        self.streams
343            .read()
344            .get(name)
345            .map(|entry| entry.source.clone())
346    }
347
348    /// Remove a stream by name.
349    pub fn drop_stream(&self, name: &str) -> bool {
350        self.streams.write().remove(name).is_some()
351    }
352
353    /// List all stream names.
354    pub fn list_streams(&self) -> Vec<String> {
355        self.streams.read().keys().cloned().collect()
356    }
357
358    /// List all source names.
359    pub fn list_sources(&self) -> Vec<String> {
360        self.sources.read().keys().cloned().collect()
361    }
362
363    /// List all sink names.
364    pub fn list_sinks(&self) -> Vec<String> {
365        self.sinks.read().keys().cloned().collect()
366    }
367
368    /// Get the input name for a registered sink.
369    pub fn get_sink_input(&self, name: &str) -> Option<String> {
370        self.sinks.read().get(name).map(|e| e.input.clone())
371    }
372
373    /// Register a query and return its ID.
374    pub(crate) fn register_query(&self, sql: &str) -> u64 {
375        let id = self.next_query_id.fetch_add(1, Ordering::Relaxed);
376        let mut queries = self.queries.write();
377        queries.insert(
378            id,
379            QueryEntry {
380                id,
381                sql: sql.to_string(),
382                active: true,
383            },
384        );
385        id
386    }
387
388    /// Mark a query as inactive. Returns `true` if the query existed.
389    pub(crate) fn deactivate_query(&self, id: u64) -> bool {
390        if let Some(entry) = self.queries.write().get_mut(&id) {
391            entry.active = false;
392            true
393        } else {
394            false
395        }
396    }
397
398    /// List all queries.
399    pub(crate) fn list_queries(&self) -> Vec<(u64, String, bool)> {
400        self.queries
401            .read()
402            .values()
403            .map(|q| (q.id, q.sql.clone(), q.active))
404            .collect()
405    }
406
407    /// Get source schema for DESCRIBE.
408    pub fn describe_source(&self, name: &str) -> Option<SchemaRef> {
409        self.sources.read().get(name).map(|e| e.schema.clone())
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use super::*;
416    use arrow::datatypes::{DataType, Field, Schema};
417
418    fn test_schema() -> SchemaRef {
419        Arc::new(Schema::new(vec![
420            Field::new("id", DataType::Int64, false),
421            Field::new("value", DataType::Float64, false),
422        ]))
423    }
424
425    #[test]
426    fn test_register_source() {
427        let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
428        let result = catalog.register_source("test", test_schema(), None, None, None, None);
429        assert!(result.is_ok());
430        assert!(catalog.get_source("test").is_some());
431    }
432
433    #[test]
434    fn test_register_duplicate_source() {
435        let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
436        catalog
437            .register_source("test", test_schema(), None, None, None, None)
438            .unwrap();
439        let result = catalog.register_source("test", test_schema(), None, None, None, None);
440        assert!(matches!(
441            result,
442            Err(crate::DbError::SourceAlreadyExists(_))
443        ));
444    }
445
446    #[test]
447    fn test_drop_source() {
448        let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
449        catalog
450            .register_source("test", test_schema(), None, None, None, None)
451            .unwrap();
452        assert!(catalog.drop_source("test"));
453        assert!(catalog.get_source("test").is_none());
454    }
455
456    #[test]
457    fn test_list_sources() {
458        let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
459        catalog
460            .register_source("a", test_schema(), None, None, None, None)
461            .unwrap();
462        catalog
463            .register_source("b", test_schema(), None, None, None, None)
464            .unwrap();
465        let mut names = catalog.list_sources();
466        names.sort();
467        assert_eq!(names, vec!["a", "b"]);
468    }
469
470    #[test]
471    fn test_register_sink() {
472        let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
473        assert!(catalog.register_sink("output", "events").is_ok());
474        assert_eq!(catalog.list_sinks(), vec!["output"]);
475    }
476
477    #[test]
478    fn test_register_query() {
479        let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
480        let id = catalog.register_query("SELECT * FROM events");
481        assert_eq!(id, 1);
482        let queries = catalog.list_queries();
483        assert_eq!(queries.len(), 1);
484        assert!(queries[0].2); // active
485    }
486
487    #[test]
488    fn test_deactivate_query() {
489        let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
490        let id = catalog.register_query("SELECT * FROM events");
491        catalog.deactivate_query(id);
492        let queries = catalog.list_queries();
493        assert!(!queries[0].2); // inactive
494    }
495
496    #[test]
497    fn test_describe_source() {
498        let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
499        let schema = test_schema();
500        catalog
501            .register_source("test", schema.clone(), None, None, None, None)
502            .unwrap();
503        let result = catalog.describe_source("test");
504        assert!(result.is_some());
505        assert_eq!(result.unwrap().fields().len(), 2);
506    }
507
508    #[test]
509    fn test_or_replace() {
510        let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
511        catalog
512            .register_source("test", test_schema(), None, None, None, None)
513            .unwrap();
514        let entry = catalog.register_source_or_replace(
515            "test",
516            test_schema(),
517            Some("ts".into()),
518            None,
519            None,
520            None,
521        );
522        assert_eq!(entry.watermark_column, Some("ts".to_string()));
523    }
524
525    #[test]
526    fn test_push_and_buffer_snapshot() {
527        let catalog = SourceCatalog::new(1024, BackpressureStrategy::Block);
528        let schema = test_schema();
529        let entry = catalog
530            .register_source("test", schema.clone(), None, None, None, None)
531            .unwrap();
532
533        let batch = RecordBatch::try_new(
534            schema,
535            vec![
536                Arc::new(arrow::array::Int64Array::from(vec![1])),
537                Arc::new(arrow::array::Float64Array::from(vec![1.5])),
538            ],
539        )
540        .unwrap();
541
542        entry.push_and_buffer(batch).unwrap();
543        let snap = entry.snapshot();
544        assert_eq!(snap.len(), 1);
545        assert_eq!(snap[0].num_rows(), 1);
546    }
547
548    #[test]
549    fn test_buffer_capacity_drops_oldest() {
550        // Use a small buffer size so we can test overflow
551        let catalog = SourceCatalog::new(2, BackpressureStrategy::DropOldest);
552        let schema = test_schema();
553        let entry = catalog
554            .register_source("test", schema.clone(), None, None, None, None)
555            .unwrap();
556
557        let values: [(i64, f64); 3] = [(0, 1.0), (1, 2.0), (2, 3.0)];
558        for (id, val) in values {
559            let batch = RecordBatch::try_new(
560                schema.clone(),
561                vec![
562                    Arc::new(arrow::array::Int64Array::from(vec![id])),
563                    Arc::new(arrow::array::Float64Array::from(vec![val])),
564                ],
565            )
566            .unwrap();
567            entry.push_and_buffer(batch).unwrap();
568        }
569
570        let snap = entry.snapshot();
571        // buffer_capacity=2, so only the last 2 batches should remain
572        assert_eq!(snap.len(), 2);
573        let col = snap[0]
574            .column(0)
575            .as_any()
576            .downcast_ref::<arrow::array::Int64Array>()
577            .unwrap();
578        assert_eq!(col.value(0), 1); // batch 0 was dropped
579    }
580}