Skip to main content

reifydb_sdk/connector/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::HashMap, sync::mpsc::SyncSender};
5
6use reifydb_core::value::column::columns::Columns;
7use reifydb_type::value::Value;
8
9use crate::{
10	error::{FFIError, Result},
11	operator::column::OperatorColumn,
12};
13
14/// Whether a source connector operates in pull or push mode
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SourceMode {
17	/// Source is polled on an interval (e.g., database query, S3 scan)
18	Pull,
19	/// Source runs continuously and pushes records (e.g., Kafka consumer, MQTT subscriber)
20	Push,
21}
22
23/// A batch of records produced by a source connector
24#[derive(Debug)]
25pub struct SourceBatch {
26	/// The columnar data
27	pub columns: Columns,
28	/// Opaque checkpoint for resumption - format is connector-defined
29	pub checkpoint: Option<Vec<u8>>,
30}
31
32impl SourceBatch {
33	pub fn empty() -> Self {
34		Self {
35			columns: Columns::empty(),
36			checkpoint: None,
37		}
38	}
39
40	pub fn is_empty(&self) -> bool {
41		!self.columns.has_rows()
42	}
43}
44
45/// Static metadata about a source connector type
46pub trait FFISourceMetadata {
47	/// Connector name (e.g., "postgres", "kafka", "mqtt")
48	const NAME: &'static str;
49	/// Semantic version (e.g., "1.0.0")
50	const VERSION: &'static str;
51	/// Human-readable description
52	const DESCRIPTION: &'static str;
53	/// Pull or Push mode
54	const MODE: SourceMode;
55	/// Shape of records this source produces
56	const OUTPUT_COLUMNS: &'static [OperatorColumn];
57}
58
59/// Runtime behavior of a source connector
60pub trait FFISource: Send + 'static {
61	/// Create a new source instance from config
62	fn new(config: &HashMap<String, Value>) -> Result<Self>
63	where
64		Self: Sized;
65
66	/// Pull mode: fetch the next batch of records.
67	/// Called on the configured poll interval.
68	/// `checkpoint` is the last committed checkpoint (None on first poll).
69	fn poll(&mut self, checkpoint: Option<&[u8]>) -> Result<SourceBatch>;
70
71	/// Push mode: run continuously, calling `emitter.emit()` for each batch.
72	/// Blocks until shutdown or error.
73	fn run(&mut self, checkpoint: Option<&[u8]>, emitter: SourceEmitter) -> Result<()>;
74
75	/// Graceful shutdown
76	fn shutdown(&mut self) -> Result<()>;
77}
78
79/// Channel for push-mode sources to emit batches
80pub struct SourceEmitter {
81	sender: SyncSender<SourceBatch>,
82}
83
84impl SourceEmitter {
85	pub fn new(sender: SyncSender<SourceBatch>) -> Self {
86		Self {
87			sender,
88		}
89	}
90
91	/// Emit a batch of records. Blocks if the channel is full (backpressure).
92	pub fn emit(&self, batch: SourceBatch) -> Result<()> {
93		self.sender.send(batch).map_err(|_| FFIError::Other("source emitter channel closed".to_string()))
94	}
95}
96
97/// Blanket trait combining metadata and runtime behavior
98pub trait FFISourceWithMetadata: FFISource + FFISourceMetadata {}
99impl<T> FFISourceWithMetadata for T where T: FFISource + FFISourceMetadata {}