use reflow_network::{
actor::{Actor, ActorConfig, ActorContext, ActorLoad, MemoryState, Port},
distributed_network::{DistributedConfig, DistributedNetwork},
message::Message,
network::NetworkConfig,
tracing::TracingIntegration,
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::time::sleep;
struct BidirectionalActor {
prefix: String,
inports: Port,
outports: Port,
load: Arc<ActorLoad>,
}
impl BidirectionalActor {
fn new(prefix: String) -> Self {
Self {
prefix,
inports: flume::unbounded(),
outports: flume::unbounded(),
load: Arc::new(ActorLoad::new(0)),
}
}
}
impl Actor for BidirectionalActor {
fn get_behavior(&self) -> reflow_network::actor::ActorBehavior {
let prefix = self.prefix.clone();
Box::new(move |context| {
let prefix = prefix.clone();
Box::pin(async move {
let payload = context.get_payload();
let mut output = HashMap::new();
for (port, message) in payload.iter() {
match port.as_str() {
"input" => {
let response = match message {
Message::String(s) => {
tracing::info!("[{}] Received: {}", prefix, s);
Message::String(format!("[{}] Processed: {}", prefix, s).into())
}
_ => {
tracing::info!("[{}] Received message: {:?}", prefix, message);
Message::String(
format!("[{}] Processed: {:?}", prefix, message).into(),
)
}
};
output.insert("output".to_string(), response);
}
"response" => {
match message {
Message::String(s) => {
tracing::info!("[{}] 🎉 Received response: {}", prefix, s);
}
_ => {
tracing::info!(
"[{}] 🎉 Received response: {:?}",
prefix,
message
);
}
}
}
_ => {
tracing::debug!("[{}] Unhandled port: {}", prefix, port);
}
}
}
Ok(output)
})
})
}
fn get_inports(&self) -> reflow_network::actor::Port {
self.inports.clone()
}
fn get_outports(&self) -> reflow_network::actor::Port {
self.outports.clone()
}
fn load_count(&self) -> Arc<ActorLoad> {
self.load.clone()
}
fn create_process(
&self,
actor_config: ActorConfig,
_tracing_integration: Option<TracingIntegration>,
) -> std::pin::Pin<Box<dyn futures::Future<Output = ()> + 'static + Send>> {
use futures::StreamExt;
let behavior = self.get_behavior();
let (_, receiver) = self.get_inports();
let outports = self.get_outports();
let load = self.load_count();
let prefix = self.prefix.clone();
Box::pin(async move {
tracing::info!(
"[{}] Actor process started, waiting for messages...",
prefix
);
loop {
tracing::debug!("[{}] Waiting for next message...", prefix);
if let Some(packet) = receiver.stream().next().await {
tracing::info!(
"[{}] 📨 Received packet with {} messages",
prefix,
packet.len()
);
let context = ActorContext::new(
packet,
outports.clone(),
Arc::new(parking_lot::Mutex::new(MemoryState::default())),
actor_config.clone(),
load.clone(),
);
if let Ok(result) = behavior(context).await {
if !result.is_empty() {
tracing::info!(
"[{}] 📤 Sending {} output messages",
prefix,
result.len()
);
outports
.0
.send(result)
.expect("Expected to send message via outport");
load.reset();
} else {
tracing::debug!("[{}] No output messages to send", prefix);
}
} else {
tracing::error!("[{}] Error processing messages", prefix);
}
} else {
tracing::warn!(
"[{}] Received None from message stream, actor stopping",
prefix
);
break;
}
}
tracing::info!("[{}] Actor process ended", prefix);
})
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 4)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
println!("🚀 Starting Distributed Network Example");
let server_config = DistributedConfig {
network_id: "server_network".to_string(),
instance_id: "server_001".to_string(),
bind_address: "127.0.0.1".to_string(),
bind_port: 9090,
discovery_endpoints: vec![],
auth_token: Some("server_token".to_string()),
max_connections: 10,
heartbeat_interval_ms: 30000,
local_network_config: NetworkConfig::default(),
};
let client_config = DistributedConfig {
network_id: "client_network".to_string(),
instance_id: "client_001".to_string(),
bind_address: "127.0.0.1".to_string(),
bind_port: 9091,
discovery_endpoints: vec![],
auth_token: Some("client_token".to_string()),
max_connections: 10,
heartbeat_interval_ms: 30000,
local_network_config: NetworkConfig::default(),
};
let mut server_network = DistributedNetwork::new(server_config).await?;
let mut client_network = DistributedNetwork::new(client_config).await?;
println!("📡 Created server and client networks");
println!("🌐 Started server and client networks");
sleep(Duration::from_secs(1)).await;
{
let server_actor = BidirectionalActor::new("SERVER".to_string());
server_network.register_local_actor("server_actor", server_actor, None)?;
server_network.start().await?;
}
{
let client_actor = BidirectionalActor::new("CLIENT".to_string());
client_network.register_local_actor("client_actor", client_actor, None)?;
client_network.start().await?;
}
println!("🎭 Registered local actors on both networks");
println!("🔌 Establishing connection from client to server...");
client_network.connect_to_network("127.0.0.1:9090").await?;
sleep(Duration::from_secs(1)).await;
println!("✅ Connection established!");
client_network
.register_remote_actor("server_actor", "server_network")
.await?;
println!("🔗 Registered remote actor on client");
println!("\n🚀 Step 1: Client → Server (via proxy)");
let test_message = Message::String("Hello from client!".to_string().into());
{
let client_net = client_network.get_local_network();
let net = client_net.read();
net.send_to_actor("server_actor@server_network", "input", test_message)?;
}
println!("📤 Client sent message to server via proxy");
sleep(Duration::from_secs(5)).await;
println!("\n🚀 Step 2: Server → Client");
server_network
.register_remote_actor("client_actor", "client_network")
.await?;
sleep(Duration::from_secs(2)).await;
let response_message = Message::String("Hello from server!".to_string().into());
{
let server_net = server_network.get_local_network();
let net = server_net.read();
net.send_to_actor("client_actor@client_network", "input", response_message)?;
}
println!("📤 Server sent message to client via proxy");
sleep(Duration::from_secs(10)).await;
println!("✅ Distributed network example completed successfully!");
println!(" - Server network running on 127.0.0.1:9080");
println!(" - Client network running on 127.0.0.1:8081");
println!(" - Cross-network communication established");
println!("\n🔽 Shutting down networks...");
if let Err(e) = client_network.shutdown().await {
println!("⚠️ Error shutting down client network: {}", e);
} else {
println!("✅ Client network shut down successfully");
}
if let Err(e) = server_network.shutdown().await {
println!("⚠️ Error shutting down server network: {}", e);
} else {
println!("✅ Server network shut down successfully");
}
println!("🎯 All networks shut down cleanly!");
Ok(())
}