use std::collections::{BTreeMap, BTreeSet};
use futures::stream::{self, StreamExt};
use meerkat_mob::SpawnMemberSpec;
use meerkat_mob::ids::MeerkatId;
use meerkat_mob::runtime::MobMemberListEntry;
use meerkat_mob::runtime::reconcile::ReconcileOptions;
use crate::runtime::RuntimeRoute;
use crate::unified_runtime::types::meerkat_reconcile_report_to_wire;
use super::edge_types::{DesiredPeerEdge, EdgeMemberView, EdgeReconcileFailure};
use super::types::{
UnifiedRuntimeReconcileEdgesReport, UnifiedRuntimeReconcileError,
UnifiedRuntimeReconcileReport, UnifiedRuntimeReconcileRoutingReport,
};
use super::{
ROSTER_ROUTE_CHANNEL, ROSTER_ROUTE_PREFIX, ROSTER_ROUTE_SINK, ROSTER_ROUTE_TARGET_MODULE,
UnifiedRuntime,
};
const EDGE_RECONCILE_CONCURRENCY: usize = 64;
impl UnifiedRuntime {
pub async fn reconcile(
&self,
desired_specs: Vec<SpawnMemberSpec>,
) -> Result<UnifiedRuntimeReconcileReport, UnifiedRuntimeReconcileError> {
self.mob_runtime
.set_baseline_member_specs(desired_specs.clone())
.await;
let mob_id = self.mob_handle().mob_id().to_string();
let meerkat_report = self
.mob_handle()
.reconcile(desired_specs, ReconcileOptions { retire_stale: true })
.await
.map_err(|err| UnifiedRuntimeReconcileError::Mob(err.into()))?;
let mob = meerkat_reconcile_report_to_wire(&mob_id, meerkat_report);
let active_snapshots = self.mob_handle().list_members_including_retiring().await;
for member in &active_snapshots {
let id = member.agent_identity.to_string();
self.console_events
.register_runtime_identity(id.clone(), id)
.await;
}
let active_member_ids = active_snapshots
.iter()
.map(|m| m.agent_identity.to_string())
.collect::<Vec<_>>();
let edges = self.reconcile_edges_from_members(active_snapshots).await;
let routing = self.reconcile_routing_wiring(active_member_ids).await?;
let report = UnifiedRuntimeReconcileReport {
mob,
edges,
routing,
};
if let Some(hook) = &self.post_reconcile_hook {
hook(report.clone()).await;
}
if !report.mob.failures.is_empty() {
return Err(UnifiedRuntimeReconcileError::PartialFailure(Box::new(
report,
)));
}
Ok(report)
}
pub async fn reconcile_edges(&self) -> UnifiedRuntimeReconcileEdgesReport {
let active_members = self.mob_handle().list_members_including_retiring().await;
let report = self.reconcile_edges_from_members(active_members).await;
if !report.is_complete() {
self.fire_error(super::types::ErrorEvent::ReconcileIncomplete {
failures: report.failures.len(),
skipped: report.skipped_missing_members.len(),
});
}
report
}
pub(super) async fn reconcile_edges_from_members(
&self,
active_members: Vec<MobMemberListEntry>,
) -> UnifiedRuntimeReconcileEdgesReport {
let edge_discovery = match &self.edge_discovery {
Some(d) => d,
None => return UnifiedRuntimeReconcileEdgesReport::default(),
};
let active_ids: BTreeSet<String> = active_members
.iter()
.map(|m| m.agent_identity.to_string())
.collect();
let mut current_edges: BTreeSet<(String, String)> = BTreeSet::new();
for member in &active_members {
for peer in &member.wired_to {
let mut a = member.agent_identity.to_string();
let mut b = peer.to_string();
if a > b {
std::mem::swap(&mut a, &mut b);
}
current_edges.insert((a, b));
}
}
let member_views: Vec<EdgeMemberView> = active_members
.into_iter()
.map(|m| EdgeMemberView {
agent_identity: m.agent_identity.to_string(),
role: m.role.to_string(),
wired_to: m.wired_to.iter().map(ToString::to_string).collect(),
labels: m.labels,
})
.collect();
let raw_desired = edge_discovery.discover_edges(member_views).await;
let desired: BTreeSet<(String, String)> = raw_desired
.iter()
.map(|e| {
let (a, b) = e.endpoints();
(a.to_string(), b.to_string())
})
.collect();
let mut report = UnifiedRuntimeReconcileEdgesReport {
desired_edges: raw_desired,
..Default::default()
};
let managed_snapshot = self.managed_dynamic_edges.read().await.clone();
let mut to_wire = Vec::new();
for (a, b) in &desired {
if !active_ids.contains(a) || !active_ids.contains(b) {
if let Ok(edge) = DesiredPeerEdge::new(a.clone(), b.clone()) {
report.skipped_missing_members.push(edge);
}
continue;
}
let key = (a.clone(), b.clone());
if managed_snapshot.contains(&key) {
if current_edges.contains(&key) {
if let Ok(edge) = DesiredPeerEdge::new(a.clone(), b.clone()) {
report.retained_edges.push(edge);
}
} else {
to_wire.push((a.clone(), b.clone(), "wire (heal)"));
}
} else if current_edges.contains(&key) {
if let Ok(edge) = DesiredPeerEdge::new(a.clone(), b.clone()) {
report.preexisting_edges.push(edge);
}
} else {
to_wire.push((a.clone(), b.clone(), "wire"));
}
}
let mut stale_pruned = Vec::new();
let mut to_unwire = Vec::new();
for (a, b) in managed_snapshot
.iter()
.filter(|key| !desired.contains(*key))
.cloned()
{
let key = (a.clone(), b.clone());
if !active_ids.contains(&a) || !active_ids.contains(&b) {
stale_pruned.push((a, b));
continue;
}
if !current_edges.contains(&key) {
stale_pruned.push((a, b));
continue;
}
to_unwire.push((a, b));
}
let handle = self.mob_handle();
let wire_operations = to_wire
.iter()
.map(|(a, b, operation)| ((a.clone(), b.clone()), (*operation).to_string()))
.collect::<BTreeMap<_, _>>();
let mut wire_successes = Vec::new();
let mut wire_failures = Vec::new();
if !to_wire.is_empty() {
let batch_edges = to_wire
.iter()
.map(|(a, b, _)| (MeerkatId::from(a.as_str()), MeerkatId::from(b.as_str())))
.collect::<Vec<_>>();
match handle.wire_members_batch(batch_edges).await {
Ok(batch_report) => {
let mut seen = BTreeSet::new();
for edge in batch_report.wired {
let key = (edge.a.to_string(), edge.b.to_string());
seen.insert(key.clone());
wire_successes.push((key.0, key.1, true));
}
for edge in batch_report.already_wired {
let key = (edge.a.to_string(), edge.b.to_string());
seen.insert(key.clone());
wire_successes.push((key.0, key.1, false));
}
for (a, b, operation) in to_wire {
let key = if a <= b { (a, b) } else { (b, a) };
if !seen.contains(&key) {
wire_failures.push((
key.0,
key.1,
operation.to_string(),
"wire_members_batch omitted edge from report".to_string(),
));
}
}
}
Err(err) => {
let error = err.to_string();
for (a, b, operation) in to_wire {
wire_failures.push((a, b, operation.to_string(), error.clone()));
}
}
}
}
let handle = self.mob_handle();
let unwire_results = stream::iter(to_unwire.into_iter().map(|(a, b)| {
let handle = handle.clone();
async move {
let result = handle
.unwire(MeerkatId::from(a.as_str()), MeerkatId::from(b.as_str()))
.await
.map_err(|err| format!("{err}"));
(a, b, result)
}
}))
.buffer_unordered(EDGE_RECONCILE_CONCURRENCY)
.collect::<Vec<_>>()
.await;
let mut managed_edges = self.managed_dynamic_edges.write().await;
for (a, b, newly_wired) in wire_successes {
managed_edges.insert((a.clone(), b.clone()));
if let Ok(edge) = DesiredPeerEdge::new(a.clone(), b.clone()) {
if newly_wired {
report.wired_edges.push(edge);
} else {
report.retained_edges.push(edge);
}
}
}
for (a, b, operation, error) in wire_failures {
if let Ok(edge) = DesiredPeerEdge::new(a.clone(), b.clone()) {
report.failures.push(EdgeReconcileFailure {
edge,
operation: wire_operations.get(&(a, b)).cloned().unwrap_or(operation),
error,
});
}
}
for (a, b) in stale_pruned {
managed_edges.remove(&(a.clone(), b.clone()));
if let Ok(edge) = DesiredPeerEdge::new(a, b) {
report.pruned_stale_managed_edges.push(edge);
}
}
for (a, b, result) in unwire_results {
match result {
Ok(()) => {
managed_edges.remove(&(a.clone(), b.clone()));
if let Ok(edge) = DesiredPeerEdge::new(a, b) {
report.unwired_edges.push(edge);
}
}
Err(error) => {
if let Ok(edge) = DesiredPeerEdge::new(a, b) {
report.failures.push(EdgeReconcileFailure {
edge,
operation: "unwire".into(),
error,
});
}
}
}
}
report
}
pub(super) async fn reconcile_routing_wiring(
&self,
mut active_members: Vec<String>,
) -> Result<UnifiedRuntimeReconcileRoutingReport, UnifiedRuntimeReconcileError> {
active_members.sort();
active_members.dedup();
let mut rt = self.module_runtime.lock().await;
let router_module_loaded = rt
.loaded_modules()
.iter()
.any(|module_id| module_id == "router");
let mut added_route_keys = Vec::new();
let mut removed_route_keys = Vec::new();
if router_module_loaded {
let managed_routes: Vec<RuntimeRoute> = rt
.list_runtime_routes()
.into_iter()
.filter(|route| route.route_key.starts_with(ROSTER_ROUTE_PREFIX))
.collect();
let active_member_set = active_members.iter().cloned().collect::<BTreeSet<_>>();
for route in &managed_routes {
if !active_member_set.contains(&route.recipient) {
rt.delete_runtime_route(&route.route_key)
.map_err(UnifiedRuntimeReconcileError::RouteMutation)?;
removed_route_keys.push(route.route_key.clone());
}
}
let existing_managed_recipients = managed_routes
.into_iter()
.map(|route| route.recipient)
.collect::<BTreeSet<_>>();
for member_id in &active_members {
if existing_managed_recipients.contains(member_id) {
continue;
}
let route_key = format!("{ROSTER_ROUTE_PREFIX}{member_id}");
rt.add_runtime_route(RuntimeRoute {
route_key: route_key.clone(),
recipient: member_id.clone(),
channel: Some(ROSTER_ROUTE_CHANNEL.to_string()),
sink: ROSTER_ROUTE_SINK.to_string(),
target_module: ROSTER_ROUTE_TARGET_MODULE.to_string(),
retry_max: None,
backoff_ms: None,
rate_limit_per_minute: None,
})
.map_err(UnifiedRuntimeReconcileError::RouteMutation)?;
added_route_keys.push(route_key);
}
}
added_route_keys.sort();
removed_route_keys.sort();
Ok(UnifiedRuntimeReconcileRoutingReport {
router_module_loaded,
active_members,
added_route_keys,
removed_route_keys,
})
}
}