use std::sync::Arc;
use opentelemetry::global;
use talos_certifier::{ports::MessageReciever, ChannelMessage};
use talos_common_utils::backpressure::config::BackPressureConfig;
use talos_suffix::{
core::{SuffixConfig, SuffixMetricsConfig},
Suffix,
};
use tokio::{sync::mpsc, task::JoinHandle, try_join};
use crate::{
callbacks::{ReplicatorInstaller, ReplicatorSnapshotProvider},
core::Replicator,
errors::{ReplicatorError, ReplicatorErrorKind},
models::{ReplicatorCandidate, ReplicatorCandidateMessage},
otel::{
initialiser::{init_otel_logs_tracing, init_otel_metrics},
otel_config::ReplicatorOtelConfig,
},
services::{
replicator_service::{ReplicatorService, ReplicatorServiceConfig},
statemap_installer_service::{installation_service, StatemapInstallerConfig},
statemap_queue_service::{StatemapQueueService, StatemapQueueServiceConfig},
},
StatemapQueueChannelMessage,
};
fn create_channel<T>(channel_size: usize) -> (tokio::sync::mpsc::Sender<T>, tokio::sync::mpsc::Receiver<T>) {
mpsc::channel::<T>(channel_size)
}
#[derive(Clone, Debug)]
pub struct CohortReplicatorConfig {
pub enable_stats: bool,
pub channel_size: usize,
pub suffix_capacity: usize,
pub suffix_prune_threshold: Option<usize>,
pub suffix_minimum_size_on_prune: Option<usize>,
pub certifier_message_receiver_commit_freq_ms: u64,
pub statemap_queue_cleanup_freq_ms: u64,
pub statemap_installer_threadpool: u64,
pub otel_telemetry: ReplicatorOtelConfig,
pub backpressure: BackPressureConfig,
}
async fn flatten_service_result<T>(handle: JoinHandle<Result<T, ReplicatorError>>) -> Result<T, ReplicatorError> {
match handle.await {
Ok(Ok(result)) => Ok(result),
Ok(Err(err)) => Err(err),
Err(err) => Err(ReplicatorError {
kind: crate::errors::ReplicatorErrorKind::Internal,
reason: err.to_string(),
cause: None,
}),
}
}
pub async fn talos_cohort_replicator<M, Snap>(
certifier_message_receiver: M, statemap_installer: Arc<dyn ReplicatorInstaller + Send + Sync>, snapshot_api: Snap, statemap_channel: (mpsc::Sender<StatemapQueueChannelMessage>, mpsc::Receiver<StatemapQueueChannelMessage>),
config: CohortReplicatorConfig,
) -> Result<((), (), ()), ReplicatorError>
where
M: MessageReciever<Message = ChannelMessage<ReplicatorCandidateMessage>> + Send + Sync + 'static,
Snap: ReplicatorSnapshotProvider + Send + Sync + 'static,
{
if config.otel_telemetry.init_otel {
init_otel_logs_tracing(
config.otel_telemetry.name.clone(),
config.otel_telemetry.enable_traces,
config.otel_telemetry.grpc_endpoint.clone(),
"info",
)
.map_err(|e| ReplicatorError {
kind: ReplicatorErrorKind::Internal,
reason: "Unable to initialise OTEL logs and traces for replicator".into(),
cause: Some(format!("{:?}", e)),
})?;
} else {
tracing::warn!("Otel traces will not be initialised from within the replicator.");
tracing::warn!("If traces are required, either initialise from the calling app, or enable the flag otel_telemetry.enable_metrics to enable from within the library")
}
if config.otel_telemetry.enable_metrics {
init_otel_metrics(config.otel_telemetry.grpc_endpoint).map_err(|e| ReplicatorError {
kind: ReplicatorErrorKind::Internal,
reason: "Unable to initialise OTEL metrics for replicator".into(),
cause: Some(format!("{:?}", e)),
})?;
} else {
tracing::warn!("Otel metrics will not be initialised from within the replicator.");
tracing::warn!("If metrics are required, either initialise from the calling app, or enable the flag otel_telemetry.enable_metrics to enable from within the library")
}
let (tx_installation_feedback_to_replicator, rx_installation_feedback_to_replicator) = create_channel(config.channel_size);
let (tx_statemaps_to_install, rx_statemaps_to_install) = create_channel(config.channel_size);
let (tx_statemaps_install_feedback, rx_statemaps_install_feedback) = create_channel(config.channel_size);
let suffix_config = SuffixConfig {
capacity: config.suffix_capacity,
prune_start_threshold: config.suffix_prune_threshold,
min_size_after_prune: config.suffix_minimum_size_on_prune,
};
let (meter, suffix) = if config.otel_telemetry.enable_metrics {
let meter = global::meter("cohort_sdk_replicator");
let metric_options = (SuffixMetricsConfig { prefix: "repl".into() }, meter.clone());
let suffix: Suffix<ReplicatorCandidate> = Suffix::with_config(suffix_config, Some(metric_options));
(Some(meter), suffix)
} else {
let suffix: Suffix<ReplicatorCandidate> = Suffix::with_config(suffix_config, None);
(None, suffix)
};
let replicator = Replicator::new(certifier_message_receiver, suffix, meter.clone());
let replicator_service_configs = ReplicatorServiceConfig {
commit_frequency_ms: config.certifier_message_receiver_commit_freq_ms,
enable_stats: config.enable_stats,
backpressure: config.backpressure,
};
let mut replicator_service = ReplicatorService::new(
statemap_channel.0,
rx_installation_feedback_to_replicator,
replicator,
replicator_service_configs,
);
let replicator_handle = tokio::spawn(async move { replicator_service.run().await });
let queue_config = StatemapQueueServiceConfig {
enable_stats: config.enable_stats,
queue_cleanup_frequency_ms: config.statemap_queue_cleanup_freq_ms,
};
let mut statemap_queue_service_ = StatemapQueueService::new(
statemap_channel.1,
tx_statemaps_to_install,
rx_statemaps_install_feedback,
tx_installation_feedback_to_replicator,
snapshot_api.into(),
queue_config,
config.channel_size,
meter,
);
let statemap_queue_handle = tokio::spawn(async move { statemap_queue_service_.run().await });
let installer_config = StatemapInstallerConfig {
thread_pool: Some(config.statemap_installer_threadpool as u16),
};
let statemap_installer_handle = tokio::spawn(installation_service(
Arc::clone(&statemap_installer),
rx_statemaps_to_install,
tx_statemaps_install_feedback,
installer_config,
));
try_join!(
flatten_service_result(replicator_handle),
flatten_service_result(statemap_queue_handle),
flatten_service_result(statemap_installer_handle)
)
}