reifydb_sdk/connector/
source.rs1use std::{collections::HashMap, sync::mpsc::SyncSender};
5
6use reifydb_core::value::column::columns::Columns;
7use reifydb_type::value::Value;
8
9use crate::{
10 error::{FFIError, Result},
11 operator::column::OperatorColumn,
12};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum SourceMode {
17 Pull,
19 Push,
21}
22
23#[derive(Debug)]
25pub struct SourceBatch {
26 pub columns: Columns,
28 pub checkpoint: Option<Vec<u8>>,
30}
31
32impl SourceBatch {
33 pub fn empty() -> Self {
34 Self {
35 columns: Columns::empty(),
36 checkpoint: None,
37 }
38 }
39
40 pub fn is_empty(&self) -> bool {
41 !self.columns.has_rows()
42 }
43}
44
45pub trait FFISourceMetadata {
47 const NAME: &'static str;
49 const VERSION: &'static str;
51 const DESCRIPTION: &'static str;
53 const MODE: SourceMode;
55 const OUTPUT_COLUMNS: &'static [OperatorColumn];
57}
58
59pub trait FFISource: Send + 'static {
61 fn new(config: &HashMap<String, Value>) -> Result<Self>
63 where
64 Self: Sized;
65
66 fn poll(&mut self, checkpoint: Option<&[u8]>) -> Result<SourceBatch>;
70
71 fn run(&mut self, checkpoint: Option<&[u8]>, emitter: SourceEmitter) -> Result<()>;
74
75 fn shutdown(&mut self) -> Result<()>;
77}
78
79pub struct SourceEmitter {
81 sender: SyncSender<SourceBatch>,
82}
83
84impl SourceEmitter {
85 pub fn new(sender: SyncSender<SourceBatch>) -> Self {
86 Self {
87 sender,
88 }
89 }
90
91 pub fn emit(&self, batch: SourceBatch) -> Result<()> {
93 self.sender.send(batch).map_err(|_| FFIError::Other("source emitter channel closed".to_string()))
94 }
95}
96
97pub trait FFISourceWithMetadata: FFISource + FFISourceMetadata {}
99impl<T> FFISourceWithMetadata for T where T: FFISource + FFISourceMetadata {}