liquid_ml/network/
server.rs1use crate::error::LiquidError;
4use crate::network::{message, Connection, ControlMsg, Message, MessageCodec};
5use log::info;
6use std::collections::HashMap;
7use std::net::SocketAddr;
8use tokio::io::split;
9use tokio::net::TcpListener;
10use tokio_util::codec::{FramedRead, FramedWrite};
11
12#[derive(Debug)]
14pub struct Server {
15 pub(crate) address: SocketAddr,
17 pub(crate) msg_id: usize,
19 pub(crate) directory:
24 HashMap<String, HashMap<usize, Connection<ControlMsg>>>,
25}
26
27impl Server {
28 pub async fn new(address: &str) -> Result<Self, LiquidError> {
31 Ok(Server {
32 msg_id: 0,
33 directory: HashMap::new(),
34 address: address.parse().unwrap(),
35 })
36 }
37
38 pub async fn accept_new_connections(&mut self) -> Result<(), LiquidError> {
47 let mut listener = TcpListener::bind(&self.address).await?;
48 loop {
49 let (socket, _) = listener.accept().await?;
51 let (reader, writer) = split(socket);
52 let mut stream = FramedRead::new(reader, MessageCodec::new());
53 let sink = FramedWrite::new(writer, MessageCodec::new());
54 let address = message::read_msg(&mut stream).await?;
56 let (address, network_name) = if let ControlMsg::Introduction {
57 address,
58 network_name,
59 } = address.msg
60 {
61 (address, network_name)
62 } else {
63 return Err(LiquidError::UnexpectedMessage);
64 };
65 let conn = Connection { address, sink };
66
67 let target_id;
68 let dir;
69 match self.directory.get_mut(&network_name) {
70 Some(d) => {
71 target_id = d.len() + 1; dir = d.iter().map(|(k, v)| (*k, v.address)).collect();
74 d.insert(target_id, conn);
75 }
76 None => {
77 target_id = 1;
78 dir = Vec::new();
79 let mut d = HashMap::new();
80 d.insert(target_id, conn);
81 self.directory.insert(network_name.clone(), d);
82 }
83 };
84
85 info!(
86 "Connected to address: {:#?} joining network {:#?}, assigning id: {:#?}",
87 &address,
88 &network_name,
89 target_id
90 );
91
92 let dir_msg = ControlMsg::Directory { dir };
94 self.send_msg(target_id, &network_name, dir_msg).await?;
95 }
96 }
97
98 pub async fn send_msg(
103 &mut self,
104 target_id: usize,
105 network_name: &str,
106 message: ControlMsg,
107 ) -> Result<(), LiquidError> {
108 let m = Message::new(self.msg_id, 0, target_id, message);
109 message::send_msg(
110 target_id,
111 m,
112 self.directory.get_mut(network_name).unwrap(),
113 )
114 .await?;
115 self.msg_id += 1;
116 Ok(())
117 }
118
119 pub async fn broadcast(
124 &mut self,
125 message: ControlMsg,
126 network_name: &str,
127 ) -> Result<(), LiquidError> {
128 let d: Vec<usize> = self
129 .directory
130 .iter()
131 .find(|(k, _)| **k == network_name)
132 .unwrap()
133 .1
134 .iter()
135 .map(|(k, _)| *k)
136 .collect();
137 for k in d {
138 self.send_msg(k, network_name, message.clone()).await?;
139 }
140 Ok(())
141 }
142}