#![doc = include_str!("stream_engine.md")]
pub mod autonomous_executor;
pub mod command;
mod in_memory_queue_repository;
mod sql_executor;
pub mod time;
use std::sync::{Arc, Mutex, MutexGuard};
use anyhow::anyhow;
pub use crate::stream_engine::autonomous_executor::SpringValue;
pub use autonomous_executor::{NnSqlValue, RowTime, SqlCompareResult, SqlValue, StreamRow, Tuple};
use crate::{
api::{error::Result, SpringConfig, SpringError},
pipeline::{Pipeline, QueueName},
stream_engine::{
autonomous_executor::{AutonomousExecutor, SchemalessRow},
command::AlterPipelineCommand,
in_memory_queue_repository::InMemoryQueueRepository,
sql_executor::SqlExecutor,
},
};
#[derive(Clone, Debug)]
pub struct EngineMutex(Arc<Mutex<StreamEngine>>);
impl EngineMutex {
pub fn new(config: &SpringConfig) -> Self {
let engine = StreamEngine::new(config);
Self(Arc::new(Mutex::new(engine)))
}
pub fn get(&self) -> Result<MutexGuard<'_, StreamEngine>> {
self.0
.lock()
.map_err(|e| {
anyhow!(
"another thread sharing the same stream-engine got panic: {:?}",
e
)
})
.map_err(SpringError::SpringQlCoreIo)
}
}
#[derive(Debug)]
pub struct StreamEngine {
sql_executor: SqlExecutor,
autonomous_executor: AutonomousExecutor,
}
impl StreamEngine {
pub fn new(config: &SpringConfig) -> Self {
Self {
sql_executor: SqlExecutor::default(),
autonomous_executor: AutonomousExecutor::new(config),
}
}
pub fn current_pipeline(&self) -> &Pipeline {
self.sql_executor.current_pipeline()
}
pub fn alter_pipeline(&mut self, command: AlterPipelineCommand) -> Result<()> {
log::debug!("[StreamEngine] alter_pipeline({:?})", command);
let pipeline = self.sql_executor.alter_pipeline(command)?;
self.autonomous_executor.notify_pipeline_update(pipeline)
}
pub fn pop_in_memory_queue_non_blocking(
&mut self,
queue_name: QueueName,
) -> Result<Option<SchemalessRow>> {
let q = InMemoryQueueRepository::instance().get(&queue_name)?;
let row = q.pop_non_blocking();
Ok(row)
}
pub fn push_in_memory_queue(
&mut self,
queue_name: QueueName,
row: SchemalessRow,
) -> Result<()> {
let q = InMemoryQueueRepository::instance().get(&queue_name)?;
q.push(row);
Ok(())
}
}