use reifydb_core::{
interface::{
catalog::{
flow::FlowStatus,
id::{NamespaceId, SinkId},
sink::Sink,
},
store::MultiVersionRow,
},
key::sink::SinkKey,
};
use reifydb_transaction::transaction::Transaction;
use serde_json::from_str;
use super::MaterializedCatalog;
use crate::{
Result,
store::sink::shape::{
sink,
sink::{CONFIG, CONNECTOR, ID, NAME, NAMESPACE, SOURCE_NAME, SOURCE_NAMESPACE, STATUS},
},
};
pub(crate) fn load_sinks(rx: &mut Transaction<'_>, catalog: &MaterializedCatalog) -> Result<()> {
let range = SinkKey::full_scan();
let stream = rx.range(range, 1024)?;
for entry in stream {
let multi = entry?;
let version = multi.version;
let sink = convert_sink(multi);
catalog.set_sink(sink.id, version, Some(sink));
}
Ok(())
}
fn convert_sink(multi: MultiVersionRow) -> Sink {
let row = multi.row;
let id = SinkId(sink::SHAPE.get_u64(&row, ID));
let namespace = NamespaceId(sink::SHAPE.get_u64(&row, NAMESPACE));
let name = sink::SHAPE.get_utf8(&row, NAME).to_string();
let source_namespace = NamespaceId(sink::SHAPE.get_u64(&row, SOURCE_NAMESPACE));
let source_name = sink::SHAPE.get_utf8(&row, SOURCE_NAME).to_string();
let connector = sink::SHAPE.get_utf8(&row, CONNECTOR).to_string();
let config_json = sink::SHAPE.get_utf8(&row, CONFIG);
let config: Vec<(String, String)> = from_str(config_json).unwrap_or_default();
let status = FlowStatus::from_u8(sink::SHAPE.get_u8(&row, STATUS));
Sink {
id,
namespace,
name,
source_namespace,
source_name,
connector,
config,
status,
}
}