use crate::{
distributed_network::DistributedNetwork,
graph::types::GraphExport,
multi_graph::{CompositionConnection, CompositionEndpoint, GraphComposition, GraphSource},
};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoteGraphConfig {
pub network_id: String,
pub graph_name: String,
pub execution_target: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DistributedConnection {
pub from: DistributedEndpoint,
pub to: DistributedEndpoint,
pub metadata: Option<HashMap<String, Value>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DistributedEndpoint {
pub network_id: Option<String>,
pub process: String,
pub port: String,
pub index: Option<usize>,
}
impl DistributedEndpoint {
pub fn qualified_name(&self) -> String {
match &self.network_id {
Some(nid) => format!("{}/{}", nid, self.process),
None => self.process.clone(),
}
}
pub fn is_remote(&self) -> bool {
self.network_id.is_some()
}
}
#[derive(Debug, Clone)]
pub struct DistributedGraphComposition {
pub local_sources: Vec<GraphSource>,
pub remote_sources: Vec<RemoteGraphConfig>,
pub local_connections: Vec<CompositionConnection>,
pub distributed_connections: Vec<DistributedConnection>,
pub properties: HashMap<String, Value>,
pub execution_targets: HashMap<String, String>,
}
pub struct DistributedNamespaceResolver {
local_network_id: String,
process_locations: HashMap<String, ProcessLocation>,
}
#[derive(Debug, Clone)]
pub struct ProcessLocation {
pub network_id: String,
pub namespace: String,
pub process_name: String,
pub is_local: bool,
}
impl DistributedNamespaceResolver {
pub fn new(local_network_id: &str) -> Self {
DistributedNamespaceResolver {
local_network_id: local_network_id.to_string(),
process_locations: HashMap::new(),
}
}
pub fn register_local_graph(&mut self, namespace: &str, graph: &GraphExport) {
for process_name in graph.processes.keys() {
let qualified = format!("{}/{}/{}", self.local_network_id, namespace, process_name);
self.process_locations.insert(
qualified,
ProcessLocation {
network_id: self.local_network_id.clone(),
namespace: namespace.to_string(),
process_name: process_name.clone(),
is_local: true,
},
);
}
}
pub fn register_remote_graph(
&mut self,
network_id: &str,
namespace: &str,
graph: &GraphExport,
) {
for process_name in graph.processes.keys() {
let qualified = format!("{}/{}/{}", network_id, namespace, process_name);
self.process_locations.insert(
qualified,
ProcessLocation {
network_id: network_id.to_string(),
namespace: namespace.to_string(),
process_name: process_name.clone(),
is_local: false,
},
);
}
}
pub fn resolve(&self, endpoint: &DistributedEndpoint) -> Option<&ProcessLocation> {
let qualified = endpoint.qualified_name();
self.process_locations.get(&qualified)
}
pub fn find_cross_network_connections(
&self,
connections: &[DistributedConnection],
) -> Vec<CrossNetworkEdge> {
let mut edges = Vec::new();
for conn in connections {
let from_location = self.resolve(&conn.from);
let to_location = self.resolve(&conn.to);
match (from_location, to_location) {
(Some(from_loc), Some(to_loc)) if from_loc.network_id != to_loc.network_id => {
edges.push(CrossNetworkEdge {
from_network: from_loc.network_id.clone(),
to_network: to_loc.network_id.clone(),
from_process: conn.from.process.clone(),
to_process: conn.to.process.clone(),
from_port: conn.from.port.clone(),
to_port: conn.to.port.clone(),
metadata: conn.metadata.clone(),
});
}
_ => {}
}
}
edges
}
}
#[derive(Debug, Clone)]
pub struct CrossNetworkEdge {
pub from_network: String,
pub to_network: String,
pub from_process: String,
pub to_process: String,
pub from_port: String,
pub to_port: String,
pub metadata: Option<HashMap<String, Value>>,
}
impl CrossNetworkEdge {
pub fn proxy_actor_name(&self) -> String {
format!("{}@{}", self.to_process, self.to_network)
}
}
#[derive(Debug)]
pub struct DistributedCompositionPlan {
pub local_composition: GraphComposition,
pub proxy_actors: Vec<ProxyActorSpec>,
pub cross_network_edges: Vec<CrossNetworkEdge>,
pub remote_executions: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct ProxyActorSpec {
pub proxy_name: String,
pub remote_network_id: String,
pub remote_actor_id: String,
}
pub fn plan_distributed_composition(
composition: &DistributedGraphComposition,
local_network_id: &str,
) -> DistributedCompositionPlan {
let mut cross_network_edges = Vec::new();
for conn in &composition.distributed_connections {
let from_net = conn.from.network_id.as_deref().unwrap_or(local_network_id);
let to_net = conn.to.network_id.as_deref().unwrap_or(local_network_id);
if from_net != to_net {
cross_network_edges.push(CrossNetworkEdge {
from_network: from_net.to_string(),
to_network: to_net.to_string(),
from_process: conn.from.process.clone(),
to_process: conn.to.process.clone(),
from_port: conn.from.port.clone(),
to_port: conn.to.port.clone(),
metadata: conn.metadata.clone(),
});
}
}
let mut proxy_actors = Vec::new();
let mut seen_proxies = std::collections::HashSet::new();
for edge in &cross_network_edges {
if edge.from_network == local_network_id && edge.to_network != local_network_id {
let proxy_name = edge.proxy_actor_name();
if seen_proxies.insert(proxy_name.clone()) {
proxy_actors.push(ProxyActorSpec {
proxy_name,
remote_network_id: edge.to_network.clone(),
remote_actor_id: edge.to_process.clone(),
});
}
}
}
let mut local_connections = composition.local_connections.clone();
for edge in &cross_network_edges {
if edge.from_network == local_network_id {
local_connections.push(CompositionConnection {
from: CompositionEndpoint {
process: edge.from_process.clone(),
port: edge.from_port.clone(),
index: None,
},
to: CompositionEndpoint {
process: edge.proxy_actor_name(),
port: edge.to_port.clone(),
index: None,
},
metadata: edge.metadata.clone(),
});
}
}
let remote_executions = composition.execution_targets.clone();
let local_composition = GraphComposition {
sources: composition.local_sources.clone(),
connections: local_connections,
shared_resources: vec![],
properties: composition.properties.clone(),
case_sensitive: None,
metadata: None,
};
DistributedCompositionPlan {
local_composition,
proxy_actors,
cross_network_edges,
remote_executions,
}
}
pub async fn execute_distributed_plan(
network: &DistributedNetwork,
plan: &DistributedCompositionPlan,
) -> Result<()> {
for proxy_spec in &plan.proxy_actors {
tracing::info!(
"Creating proxy actor '{}' -> {}::{}",
proxy_spec.proxy_name,
proxy_spec.remote_network_id,
proxy_spec.remote_actor_id
);
network
.register_remote_actor(&proxy_spec.remote_actor_id, &proxy_spec.remote_network_id)
.await?;
}
tracing::info!(
"Distributed plan executed: {} proxy actors created, {} cross-network edges",
plan.proxy_actors.len(),
plan.cross_network_edges.len(),
);
Ok(())
}