use crate::config::{Config, EndpointConfig};
use crate::dedup::ConcurrentDedup;
use crate::endpoint_core::ExponentialBackoff;
use crate::error::Result;
use crate::filter::EndpointFilters;
use crate::router::{create_bus, MessageBus};
use crate::routing::RoutingTable;
use parking_lot::RwLock;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
#[allow(dead_code)] pub struct OrchestratedRouter {
pub handles: Vec<JoinHandle<()>>,
pub bus: MessageBus,
pub routing_table: Arc<RwLock<RoutingTable>>,
}
pub fn spawn_all(config: &Config, cancel_token: &CancellationToken) -> OrchestratedRouter {
let bus = create_bus(config.general.bus_capacity);
let routing_table = Arc::new(RwLock::new(RoutingTable::new()));
let mut handles = Vec::new();
let dedup_period = config.general.dedup_period_ms.unwrap_or(0);
let dedup = ConcurrentDedup::new(Duration::from_millis(dedup_period));
let prune_ttl = config.general.routing_table_ttl_secs;
let prune_interval = config.general.routing_table_prune_interval_secs;
let dedup_rotation_interval = dedup.rotation_interval();
if !dedup_rotation_interval.is_zero() {
let dedup_rotator = dedup.clone();
let dedup_token = cancel_token.child_token();
handles.push(tokio::spawn(async move {
let mut interval = tokio::time::interval(dedup_rotation_interval);
loop {
tokio::select! {
_ = dedup_token.cancelled() => {
info!("Dedup Rotator shutting down.");
break;
}
_ = interval.tick() => {
dedup_rotator.rotate_buckets();
}
}
}
}));
}
{
let rt_prune = routing_table.clone();
let prune_token = cancel_token.child_token();
handles.push(tokio::spawn(async move {
loop {
tokio::select! {
_ = prune_token.cancelled() => {
info!("RoutingTable Pruner shutting down.");
break;
}
_ = tokio::time::sleep(Duration::from_secs(prune_interval)) => {
let mut rt = rt_prune.write();
rt.prune(Duration::from_secs(prune_ttl));
}
}
}
}));
}
if let Some(port) = config.general.tcp_port {
let name = format!("Implicit TCP Server :{}", port);
let bus_tx = bus.sender();
let rt = routing_table.clone();
let dd = dedup.clone();
let id = config.endpoint.len();
let filters = EndpointFilters::default();
let addr = format!("0.0.0.0:{}", port);
let task_token = cancel_token.child_token();
handles.push(tokio::spawn(supervise(
name,
task_token.clone(),
move || {
let bus_tx = bus_tx.clone();
let bus_rx = bus_tx.subscribe();
let rt = rt.clone();
let dd = dd.clone();
let filters = filters.clone();
let addr = addr.clone();
let m = crate::config::EndpointMode::Server;
let token = task_token.clone();
async move {
crate::endpoints::tcp::run(id, addr, m, bus_tx, bus_rx, rt, dd, filters, token)
.await
}
},
)));
}
if let Some(log_dir) = &config.general.log {
if config.general.log_telemetry {
let name = format!("TLog Logger {}", log_dir);
let bus_tx_tlog = bus.sender();
let dir = log_dir.clone();
let task_token = cancel_token.child_token();
handles.push(tokio::spawn(supervise(
name,
task_token.clone(),
move || {
let bus_rx = bus_tx_tlog.subscribe();
let dir = dir.clone();
let token = task_token.clone();
async move { crate::endpoints::tlog::run(dir, bus_rx, token).await }
},
)));
}
}
for (i, endpoint_config) in config.endpoint.iter().enumerate() {
let bus = bus.clone();
let bus_tx = bus.sender();
let routing_table = routing_table.clone();
let dedup = dedup.clone();
let task_token = cancel_token.child_token();
match endpoint_config {
EndpointConfig::Udp {
address,
mode,
filters,
} => {
let name = format!("UDP Endpoint {} ({})", i, address);
let address = address.clone();
let mode = mode.clone();
let filters = filters.clone();
let cleanup_ttl = prune_ttl;
handles.push(tokio::spawn(supervise(
name,
task_token.clone(),
move || {
crate::endpoints::udp::run(
i,
address.clone(),
mode.clone(),
bus_tx.clone(),
bus.subscribe(),
routing_table.clone(),
dedup.clone(),
filters.clone(),
task_token.clone(),
cleanup_ttl,
)
},
)));
}
EndpointConfig::Tcp {
address,
mode,
filters,
} => {
let name = format!("TCP Endpoint {} ({})", i, address);
let address = address.clone();
let mode = mode.clone();
let filters = filters.clone();
handles.push(tokio::spawn(supervise(
name,
task_token.clone(),
move || {
crate::endpoints::tcp::run(
i,
address.clone(),
mode.clone(),
bus_tx.clone(),
bus.subscribe(),
routing_table.clone(),
dedup.clone(),
filters.clone(),
task_token.clone(),
)
},
)));
}
EndpointConfig::Serial {
device,
baud,
filters,
} => {
let name = format!("Serial Endpoint {} ({})", i, device);
let device = device.clone();
let baud = *baud;
let filters = filters.clone();
handles.push(tokio::spawn(supervise(
name,
task_token.clone(),
move || {
crate::endpoints::serial::run(
i,
device.clone(),
baud,
bus_tx.clone(),
bus.subscribe(),
routing_table.clone(),
dedup.clone(),
filters.clone(),
task_token.clone(),
)
},
)));
}
}
}
OrchestratedRouter {
handles,
bus,
routing_table,
}
}
pub async fn supervise<F, Fut>(name: String, cancel_token: CancellationToken, task_factory: F)
where
F: Fn() -> Fut + Send + 'static,
Fut: std::future::Future<Output = Result<()>> + Send + 'static,
{
let mut backoff = ExponentialBackoff::new(Duration::from_secs(1), Duration::from_secs(30), 2.0);
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Supervisor for {} shutting down.", name);
break;
}
_ = async {
let start_time = std::time::Instant::now();
let result = task_factory().await;
if start_time.elapsed() > Duration::from_secs(60) {
backoff.reset();
}
match result {
Ok(_) => {
warn!("Supervisor: Task {} finished cleanly (unexpected). Restarting...", name);
}
Err(e) => {
error!("Supervisor: Task {} failed: {:#}. Restarting...", name, e);
}
}
} => {}
}
let wait = backoff.next_backoff();
info!(
"Supervisor: Waiting {:.1?} before restarting {}",
wait, name
);
tokio::select! {
_ = tokio::time::sleep(wait) => {},
_ = cancel_token.cancelled() => {
info!("Supervisor for {} shutting down during backoff.", name);
break;
}
}
}
}