use std::sync::Arc;
use anyhow::{Context, ensure};
use async_trait::async_trait;
use daemon::{Backends, DaemonOptions, SourceParts};
use schema::{Config, Sink as SinkConfig, SourceType};
use sinks_core::{FanOutSink, Sink};
use sinks_opensearch::OpensearchSink;
use sinks_stdout::StdoutSink;
use sources_core::SourceSpec;
use sources_core::cdc::ChangeCapture;
use sources_core::document::DocumentBuilder;
use sources_postgres::{PgDocumentBuilder, ReplicationConfig, WalChangeCapture};
use url::Url;
#[derive(Debug, Default)]
pub(crate) struct FlussoBackends;
#[async_trait]
impl Backends for FlussoBackends {
async fn source(
&self,
config: Arc<Config>,
options: &DaemonOptions,
) -> anyhow::Result<SourceParts> {
ensure!(
config.source.source_type == SourceType::Postgres,
"only postgres sources are supported",
);
let connection_url = resolve_connection_url(&config)?;
let replication = replication_config(&connection_url, &options.slot, &options.publication)?;
let capture: Arc<dyn ChangeCapture> =
Arc::new(WalChangeCapture::new(replication, connection_url.clone()));
let documents = build_documents(&connection_url, &config).await?;
Ok(SourceParts { capture, documents })
}
async fn sink(
&self,
config: &Config,
options: &DaemonOptions,
) -> anyhow::Result<Arc<dyn Sink>> {
build_sink(config, options.pretty)
}
}
fn resolve_connection_url(config: &Config) -> anyhow::Result<String> {
let url = config
.source
.resolve_connection_url()
.context("resolving the source connection URL")?;
Ok(url.as_ref().to_owned())
}
fn replication_config(
connection_url: &str,
slot: &str,
publication: &str,
) -> anyhow::Result<ReplicationConfig> {
let url = Url::parse(connection_url).context("parsing connection URL")?;
let host = url
.host_str()
.context("connection URL has no host")?
.to_owned();
let port = url.port().unwrap_or(5432);
let user = url.username();
ensure!(!user.is_empty(), "connection URL has no user");
let password = url.password().unwrap_or_default();
let database = url.path().trim_start_matches('/');
let database = if database.is_empty() { user } else { database };
Ok(ReplicationConfig::new(host, user, password, database, slot, publication).with_port(port))
}
async fn build_documents(
connection_url: &str,
config: &Config,
) -> anyhow::Result<Arc<dyn DocumentBuilder>> {
let spec = Arc::new(source_spec(config));
let builder = PgDocumentBuilder::connect(connection_url, spec)
.await
.context("connecting to Postgres")?;
Ok(Arc::new(builder))
}
pub(crate) fn source_spec(config: &Config) -> SourceSpec {
let indexes = config
.indexes
.iter()
.filter(|(_, index)| index.enabled)
.map(|(name, index)| (name.clone(), index.schema.clone()))
.collect();
SourceSpec::new(indexes)
}
fn build_sink(config: &Config, pretty: bool) -> anyhow::Result<Arc<dyn Sink>> {
let mut sinks: Vec<Arc<dyn Sink>> = Vec::new();
for (name, sink_config) in &config.sinks {
let sink: Arc<dyn Sink> = match sink_config {
SinkConfig::Opensearch(os) => Arc::new(
OpensearchSink::from_config(name, os)
.with_context(|| format!("building OpenSearch sink '{name}'"))?,
),
SinkConfig::Stdout(s) => Arc::new(StdoutSink::from_config(s)),
};
sinks.push(sink);
}
Ok(match sinks.len() {
0 => Arc::new(StdoutSink::new(pretty)),
1 => sinks
.into_iter()
.next()
.unwrap_or_else(|| Arc::new(StdoutSink::new(pretty))),
_ => Arc::new(FanOutSink::new(sinks)),
})
}