use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use laminar_core::streaming::{Record, Subscription};
use crate::catalog::{ArrowRecord, SourceEntry};
use crate::DbError;
#[derive(Debug)]
pub enum ExecuteResult {
Ddl(DdlInfo),
Query(QueryHandle),
RowsAffected(u64),
Metadata(RecordBatch),
}
impl ExecuteResult {
pub fn into_query(self) -> Result<QueryHandle, DbError> {
match self {
Self::Query(q) => Ok(q),
_ => Err(DbError::InvalidOperation(
"Expected a query result".to_string(),
)),
}
}
}
#[derive(Debug, Clone)]
pub struct DdlInfo {
pub statement_type: String,
pub object_name: String,
}
#[derive(Debug)]
pub struct QueryHandle {
pub(crate) id: u64,
pub(crate) schema: SchemaRef,
pub(crate) sql: String,
pub(crate) subscription: Option<Subscription<ArrowRecord>>,
pub(crate) active: bool,
}
impl QueryHandle {
#[must_use]
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
#[must_use]
pub fn sql(&self) -> &str {
&self.sql
}
#[must_use]
pub fn id(&self) -> u64 {
self.id
}
#[must_use]
pub fn is_active(&self) -> bool {
self.active
}
pub(crate) fn subscribe_raw(&mut self) -> Result<Subscription<ArrowRecord>, DbError> {
self.subscription
.take()
.ok_or_else(|| DbError::InvalidOperation("Subscription already consumed".to_string()))
}
pub fn subscribe<T: FromBatch>(&mut self) -> Result<TypedSubscription<T>, DbError> {
let sub = self.subscribe_raw()?;
Ok(TypedSubscription {
inner: sub,
_phantom: PhantomData,
})
}
pub fn cancel(&mut self) {
self.active = false;
self.subscription = None;
}
}
pub trait FromBatch: Sized {
fn from_batch(batch: &RecordBatch, row: usize) -> Self;
fn from_batch_all(batch: &RecordBatch) -> Vec<Self>;
}
pub struct TypedSubscription<T: FromBatch> {
inner: Subscription<ArrowRecord>,
_phantom: PhantomData<T>,
}
impl<T: FromBatch> TypedSubscription<T> {
pub(crate) fn from_raw(sub: Subscription<ArrowRecord>) -> Self {
Self {
inner: sub,
_phantom: PhantomData,
}
}
pub fn poll(&mut self) -> Option<Vec<T>> {
self.inner.poll().map(|batch| T::from_batch_all(&batch))
}
pub fn recv(&mut self) -> Result<Vec<T>, laminar_core::streaming::RecvError> {
self.inner.recv().map(|batch| T::from_batch_all(&batch))
}
pub fn recv_timeout(
&mut self,
timeout: Duration,
) -> Result<Vec<T>, laminar_core::streaming::RecvError> {
self.inner
.recv_timeout(timeout)
.map(|batch| T::from_batch_all(&batch))
}
pub fn poll_each<F: FnMut(T) -> bool>(&mut self, max_batches: usize, mut f: F) -> usize {
let mut count = 0;
for _ in 0..max_batches {
match self.inner.poll() {
Some(batch) => {
let items = T::from_batch_all(&batch);
for item in items {
count += 1;
if !f(item) {
return count;
}
}
}
None => break,
}
}
count
}
#[allow(dead_code)]
pub(crate) fn into_raw(self) -> Subscription<ArrowRecord> {
self.inner
}
}
impl<T: FromBatch> std::fmt::Debug for TypedSubscription<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TypedSubscription").finish()
}
}
pub struct SourceHandle<T: Record> {
entry: Arc<SourceEntry>,
_phantom: PhantomData<T>,
}
impl<T: Record> SourceHandle<T> {
pub(crate) fn new(entry: Arc<SourceEntry>) -> Result<Self, DbError> {
let rust_schema = T::schema();
let sql_schema = &entry.schema;
if rust_schema.fields().len() != sql_schema.fields().len() {
return Err(DbError::SchemaMismatch(format!(
"Rust type has {} fields but source '{}' has {} columns",
rust_schema.fields().len(),
entry.name,
sql_schema.fields().len()
)));
}
Ok(Self {
entry,
_phantom: PhantomData,
})
}
#[allow(clippy::needless_pass_by_value)]
pub fn push(&self, record: T) -> Result<(), laminar_core::streaming::StreamingError> {
let batch = record.to_record_batch();
self.entry.push_and_buffer(batch)
}
pub fn push_batch(&self, records: impl IntoIterator<Item = T>) -> usize {
const BATCH_SIZE: usize = 1024;
let mut count = 0;
let mut buffer = Vec::with_capacity(BATCH_SIZE);
for record in records {
buffer.push(record);
if buffer.len() >= BATCH_SIZE {
let batch = T::to_record_batch_from_iter(buffer.drain(..));
if self.push_arrow(batch).is_err() {
return count;
}
count += BATCH_SIZE;
}
}
if !buffer.is_empty() {
let len = buffer.len();
let batch = T::to_record_batch_from_iter(buffer);
if self.push_arrow(batch).is_ok() {
count += len;
}
}
count
}
pub fn push_arrow(
&self,
batch: RecordBatch,
) -> Result<(), laminar_core::streaming::StreamingError> {
self.entry.push_and_buffer(batch)
}
pub fn watermark(&self, timestamp: i64) {
self.entry.source.watermark(timestamp);
}
#[must_use]
pub fn current_watermark(&self) -> i64 {
self.entry.source.current_watermark()
}
#[must_use]
pub fn pending(&self) -> usize {
self.entry.source.pending()
}
#[must_use]
pub fn capacity(&self) -> usize {
self.entry.source.capacity()
}
#[must_use]
pub fn is_backpressured(&self) -> bool {
crate::metrics::is_backpressured(self.pending(), self.capacity())
}
#[must_use]
pub fn name(&self) -> &str {
&self.entry.name
}
#[must_use]
pub fn schema(&self) -> &SchemaRef {
&self.entry.schema
}
#[must_use]
pub fn max_out_of_orderness(&self) -> Option<Duration> {
self.entry.max_out_of_orderness
}
pub fn set_event_time_column(&self, column: &str) {
self.entry.source.set_event_time_column(column);
}
}
impl<T: Record> std::fmt::Debug for SourceHandle<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SourceHandle")
.field("name", &self.entry.name)
.field("pending", &self.pending())
.finish()
}
}
pub struct UntypedSourceHandle {
entry: Arc<SourceEntry>,
}
impl UntypedSourceHandle {
pub(crate) fn new(entry: Arc<SourceEntry>) -> Self {
Self { entry }
}
pub fn push_arrow(
&self,
batch: RecordBatch,
) -> Result<(), laminar_core::streaming::StreamingError> {
self.entry.push_and_buffer(batch)
}
pub fn watermark(&self, timestamp: i64) {
self.entry.source.watermark(timestamp);
}
#[must_use]
pub fn current_watermark(&self) -> i64 {
self.entry.source.current_watermark()
}
#[must_use]
pub fn pending(&self) -> usize {
self.entry.source.pending()
}
#[must_use]
pub fn capacity(&self) -> usize {
self.entry.source.capacity()
}
#[must_use]
pub fn is_backpressured(&self) -> bool {
crate::metrics::is_backpressured(self.pending(), self.capacity())
}
#[must_use]
pub fn name(&self) -> &str {
&self.entry.name
}
#[must_use]
pub fn schema(&self) -> &SchemaRef {
&self.entry.schema
}
#[must_use]
pub fn max_out_of_orderness(&self) -> Option<Duration> {
self.entry.max_out_of_orderness
}
pub fn set_event_time_column(&self, column: &str) {
self.entry.source.set_event_time_column(column);
}
}
impl std::fmt::Debug for UntypedSourceHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UntypedSourceHandle")
.field("name", &self.entry.name)
.finish()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipelineNodeType {
Source,
Stream,
Sink,
}
#[derive(Debug, Clone)]
pub struct PipelineNode {
pub name: String,
pub node_type: PipelineNodeType,
pub schema: Option<SchemaRef>,
pub sql: Option<String>,
}
#[derive(Debug, Clone)]
pub struct PipelineEdge {
pub from: String,
pub to: String,
}
#[derive(Debug, Clone)]
pub struct PipelineTopology {
pub nodes: Vec<PipelineNode>,
pub edges: Vec<PipelineEdge>,
}
#[derive(Debug, Clone)]
pub struct StreamInfo {
pub name: String,
pub sql: Option<String>,
}
#[derive(Debug, Clone)]
pub struct SourceInfo {
pub name: String,
pub schema: SchemaRef,
pub watermark_column: Option<String>,
}
#[derive(Debug, Clone)]
pub struct SinkInfo {
pub name: String,
}
#[derive(Debug, Clone)]
pub struct QueryInfo {
pub id: u64,
pub sql: String,
pub active: bool,
}