use reifydb_core::{
common::{JoinType, WindowKind},
interface::catalog::{
flow::{FlowEdgeId, FlowId, FlowNodeId},
id::{RingBufferId, SeriesId, SubscriptionId, TableId, ViewId},
series::SeriesKey,
},
sort::SortKey,
};
use serde::{Deserialize, Serialize};
use crate::expression::Expression;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FlowNodeType {
SourceInlineData {},
SourceTable {
table: TableId,
},
SourceView {
view: ViewId,
},
SourceFlow {
flow: FlowId,
},
SourceRingBuffer {
ringbuffer: RingBufferId,
},
SourceSeries {
series: SeriesId,
},
Filter {
conditions: Vec<Expression>,
},
Gate {
conditions: Vec<Expression>,
},
Map {
expressions: Vec<Expression>,
},
Extend {
expressions: Vec<Expression>,
},
Join {
join_type: JoinType,
left: Vec<Expression>,
right: Vec<Expression>,
alias: Option<String>,
},
Aggregate {
by: Vec<Expression>,
map: Vec<Expression>,
},
Append,
Sort {
by: Vec<SortKey>,
},
Take {
limit: usize,
},
Distinct {
expressions: Vec<Expression>,
},
Apply {
operator: String,
expressions: Vec<Expression>,
},
SinkTableView {
view: ViewId,
table: TableId,
},
SinkRingBufferView {
view: ViewId,
ringbuffer: RingBufferId,
capacity: u64,
propagate_evictions: bool,
},
SinkSeriesView {
view: ViewId,
series: SeriesId,
key: SeriesKey,
},
SinkSubscription {
subscription: SubscriptionId,
},
Window {
kind: WindowKind,
group_by: Vec<Expression>,
aggregations: Vec<Expression>,
ts: Option<String>,
},
}
impl FlowNodeType {
pub fn discriminator(&self) -> u8 {
match self {
FlowNodeType::SourceInlineData {
..
} => 0,
FlowNodeType::SourceTable {
..
} => 1,
FlowNodeType::SourceView {
..
} => 2,
FlowNodeType::SourceFlow {
..
} => 3,
FlowNodeType::SourceRingBuffer {
..
} => 17,
FlowNodeType::Filter {
..
} => 4,
FlowNodeType::Gate {
..
} => 19,
FlowNodeType::Map {
..
} => 5,
FlowNodeType::Extend {
..
} => 6,
FlowNodeType::Join {
..
} => 7,
FlowNodeType::Aggregate {
..
} => 8,
FlowNodeType::Append => 9,
FlowNodeType::Sort {
..
} => 10,
FlowNodeType::Take {
..
} => 11,
FlowNodeType::Distinct {
..
} => 12,
FlowNodeType::Apply {
..
} => 13,
FlowNodeType::SinkTableView {
..
} => 20,
FlowNodeType::SinkRingBufferView {
..
} => 21,
FlowNodeType::SinkSeriesView {
..
} => 22,
FlowNodeType::SinkSubscription {
..
} => 15,
FlowNodeType::Window {
..
} => 16,
FlowNodeType::SourceSeries {
..
} => 18,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowNode {
pub id: FlowNodeId,
pub ty: FlowNodeType,
pub inputs: Vec<FlowNodeId>,
pub outputs: Vec<FlowNodeId>,
}
impl FlowNode {
pub fn new(id: impl Into<FlowNodeId>, ty: FlowNodeType) -> Self {
Self {
id: id.into(),
ty,
inputs: Vec::new(),
outputs: Vec::new(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct FlowEdge {
pub id: FlowEdgeId,
pub source: FlowNodeId,
pub target: FlowNodeId,
}
impl FlowEdge {
pub fn new(id: impl Into<FlowEdgeId>, source: impl Into<FlowNodeId>, target: impl Into<FlowNodeId>) -> Self {
Self {
id: id.into(),
source: source.into(),
target: target.into(),
}
}
}