use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use parking_lot::RwLock;
use crate::{
api::{error::Result, SpringSinkWriterConfig},
pipeline::{SinkWriterModel, SinkWriterName},
stream_engine::autonomous_executor::task::sink_task::sink_writer::{
sink_writer_factory::SinkWriterFactory, SinkWriter,
},
};
#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub struct SinkWriterRepository {
config: SpringSinkWriterConfig,
sinks: RwLock<HashMap<SinkWriterName, Arc<Mutex<Box<dyn SinkWriter>>>>>,
}
impl SinkWriterRepository {
pub fn new(config: SpringSinkWriterConfig) -> Self {
Self {
config,
sinks: RwLock::default(),
}
}
pub fn register(&self, sink_writer: &SinkWriterModel) -> Result<()> {
let mut sinks = self.sinks.write();
if sinks.get(sink_writer.name()).is_some() {
Ok(())
} else {
let subtask = SinkWriterFactory::sink(
sink_writer.sink_writer_type(),
sink_writer.options(),
&self.config,
)?;
let subtask = Arc::new(Mutex::new(subtask as Box<dyn SinkWriter>));
let _ = sinks.insert(sink_writer.name().clone(), subtask);
log::debug!(
"[SinkWriterRepository] registered sink subtask: {}",
sink_writer.name()
);
Ok(())
}
}
pub fn get_sink_writer(&self, name: &SinkWriterName) -> Arc<Mutex<Box<dyn SinkWriter>>> {
self.sinks
.read()
.get(name)
.unwrap_or_else(|| panic!("sink name ({}) not registered yet", name))
.clone()
}
}