use std::marker::PhantomData;
use crate::core::io::{ConnectionDetails, ConnectionInfo};
use crate::core::marker::Unset;
use crate::core::utils::UniqueId;
use crate::sync::io::ConnectionBuilder;
use crate::sync::marker::ConnConf;
use crate::prelude::*;
impl Network<Versionless, Unset> {
pub fn sync<V: MaybeVersioned>() -> Network<V, ConnConf<V>> {
Network {
info: ConnectionInfo::new(ConnectionDetails::Network),
nodes: Default::default(),
retry: Default::default(),
stop_on_node_down: Default::default(),
_version: PhantomData,
}
}
}
impl<V: MaybeVersioned> Network<V, ConnConf<V>> {
pub fn add_connection(
mut self,
conn_conf: impl ConnectionBuilder<V> + 'static,
) -> Network<V, ConnConf<V>> {
let node = Node::sync::<V>().connection(conn_conf).conf();
self.nodes.insert(UniqueId::new(), node);
Network {
nodes: self.nodes,
..self
}
}
pub fn add_signed_connection(
mut self,
conn_conf: impl ConnectionBuilder<V> + 'static,
signer: FrameSigner,
) -> Network<V, ConnConf<V>> {
let node = Node::sync::<V>()
.connection(conn_conf)
.signer(signer)
.conf();
self.nodes.insert(UniqueId::new(), node);
Network {
nodes: self.nodes,
..self
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
use crate::core::consts::SERVER_HANG_UP_TIMEOUT;
use crate::core::io::RetryStrategy;
use crate::core::utils::net::pick_unused_port;
use crate::protocol::dialects::minimal::messages::Heartbeat;
use crate::sync::prelude::*;
const RECONNECT_INTERVAL: Duration = SERVER_HANG_UP_TIMEOUT;
const WAIT_DURATION: Duration = Duration::from_millis(100);
const RECV_TIMEOUT: Duration = WAIT_DURATION;
fn wait() {
thread::sleep(WAIT_DURATION);
}
#[test]
fn basic_network_workflow() {
let addr_1 = format!("127.0.0.1:{}", pick_unused_port().unwrap());
let addr_2 = format!("127.0.0.1:{}", pick_unused_port().unwrap());
let network = Network::sync()
.add_node(Node::sync::<V2>().connection(TcpServer::new(addr_1.as_str()).unwrap()))
.add_connection(TcpServer::new(addr_2.as_str()).unwrap());
assert_eq!(network.nodes.len(), 2);
let server = Node::sync::<V2>()
.id(MavLinkId::new(1, 0))
.connection(network)
.build()
.unwrap();
wait();
let client_1 = Node::sync::<V2>()
.id(MavLinkId::new(1, 1))
.connection(TcpClient::new(addr_1.as_str()).unwrap())
.build()
.unwrap();
let client_2 = Node::sync::<V2>()
.id(MavLinkId::new(1, 2))
.connection(TcpClient::new(addr_2.as_str()).unwrap())
.build()
.unwrap();
server.send(&Heartbeat::default()).unwrap();
let (frame, _) = client_1.recv_frame_timeout(RECV_TIMEOUT).unwrap();
assert_eq!(frame.system_id(), 1);
assert_eq!(frame.component_id(), 0);
let (frame, _) = client_2.recv_frame_timeout(RECV_TIMEOUT).unwrap();
assert_eq!(frame.system_id(), 1);
assert_eq!(frame.component_id(), 0);
client_1.send(&Heartbeat::default()).unwrap();
let (frame, _) = server.recv_frame_timeout(RECV_TIMEOUT).unwrap();
assert_eq!(frame.system_id(), 1);
assert_eq!(frame.component_id(), 1);
}
#[test]
fn network_reconnect() {
let addr = format!("127.0.0.1:{}", pick_unused_port().unwrap());
let server_conf = Node::sync::<V2>()
.id(MavLinkId::new(1, 0))
.connection(TcpServer::new(addr.as_str()).unwrap())
.conf();
let server = Node::try_from_conf(server_conf.clone()).unwrap();
wait();
let network = Network::sync()
.add_connection(TcpClient::new(addr.as_str()).unwrap())
.retry(RetryStrategy::Always(RECONNECT_INTERVAL));
let client = Node::sync::<V2>()
.id(MavLinkId::new(1, 1))
.connection(network)
.build()
.unwrap();
wait();
drop(server);
wait();
let server = Node::try_from_conf(server_conf.clone()).unwrap();
client.send(&Heartbeat::default()).unwrap();
wait();
client.send(&Heartbeat::default()).unwrap();
server.recv_frame_timeout(RECV_TIMEOUT).unwrap();
}
}