qudag_protocol/
coordinator.rs1use crate::{
7 config::Config as ProtocolConfig,
8 message::Message,
9 node::{Node, NodeConfig},
10 types::{ProtocolError, ProtocolEvent, ProtocolState},
11};
12use qudag_crypto::KeyPair;
13use qudag_dag::QrDag;
14use qudag_network::NetworkManager;
15use std::sync::Arc;
16use tokio::sync::{mpsc, RwLock};
17use tracing::{debug, info};
18
19pub struct Coordinator {
21 node: Node,
23 #[allow(dead_code)]
25 config: ProtocolConfig,
26 state: Arc<RwLock<ProtocolState>>,
28 #[allow(dead_code)]
30 events: CoordinatorEvents,
31 crypto: Option<KeyPair>,
33 network: Option<NetworkManager>,
35 dag: Option<QrDag>,
37}
38
39struct CoordinatorEvents {
41 #[allow(dead_code)]
43 tx: mpsc::Sender<ProtocolEvent>,
44 #[allow(dead_code)]
46 rx: mpsc::Receiver<ProtocolEvent>,
47}
48
49impl Coordinator {
50 pub async fn new(config: ProtocolConfig) -> Result<Self, ProtocolError> {
52 let node_config = NodeConfig {
53 data_dir: config.node.data_dir.clone(),
54 network_port: config.network.port,
55 max_peers: config.network.max_peers,
56 initial_peers: Vec::new(),
57 };
58
59 let node = Node::new(node_config).await?;
60 let (tx, rx) = mpsc::channel(1000);
61
62 Ok(Self {
63 node,
64 config,
65 state: Arc::new(RwLock::new(ProtocolState::Initial)),
66 events: CoordinatorEvents { tx, rx },
67 crypto: None,
68 network: None,
69 dag: None,
70 })
71 }
72
73 pub async fn start(&mut self) -> Result<(), ProtocolError> {
75 info!("Starting protocol coordinator...");
76
77 self.init_crypto().await?;
79 self.init_network().await?;
80 self.init_dag().await?;
81
82 self.node.start().await?;
84
85 {
87 let mut state = self.state.write().await;
88 *state = ProtocolState::Running;
89 }
90
91 info!("Protocol coordinator started successfully");
92 Ok(())
93 }
94
95 pub async fn stop(&mut self) -> Result<(), ProtocolError> {
97 info!("Stopping protocol coordinator...");
98
99 {
101 let mut state = self.state.write().await;
102 *state = ProtocolState::Stopping;
103 }
104
105 self.node.stop().await?;
107
108 {
110 let mut state = self.state.write().await;
111 *state = ProtocolState::Stopped;
112 }
113
114 info!("Protocol coordinator stopped successfully");
115 Ok(())
116 }
117
118 pub async fn state(&self) -> ProtocolState {
120 self.state.read().await.clone()
121 }
122
123 pub fn is_initialized(&self) -> bool {
125 self.crypto.is_some() && self.network.is_some() && self.dag.is_some()
126 }
127
128 pub async fn broadcast_message(&mut self, message: Vec<u8>) -> Result<(), ProtocolError> {
130 debug!("Broadcasting message of {} bytes", message.len());
131
132 let proto_message =
134 Message::new(crate::message::MessageType::Data(message.clone()), vec![]);
135
136 if let Some(ref _crypto) = self.crypto {
138 }
141
142 self.node
144 .handle_message(proto_message)
145 .await
146 .map_err(|e| ProtocolError::Internal(e.to_string()))?;
147
148 if let Some(ref mut dag) = self.dag {
150 dag.add_message(message)
151 .map_err(|e| ProtocolError::Internal(e.to_string()))?;
152 }
153
154 Ok(())
155 }
156
157 pub fn crypto_manager(&self) -> Option<&KeyPair> {
159 self.crypto.as_ref()
160 }
161
162 pub fn network_manager(&self) -> Option<&NetworkManager> {
164 self.network.as_ref()
165 }
166
167 pub fn dag_manager(&self) -> Option<&QrDag> {
169 self.dag.as_ref()
170 }
171
172 async fn init_crypto(&mut self) -> Result<(), ProtocolError> {
174 self.crypto = Some(KeyPair::new());
177 Ok(())
178 }
179
180 async fn init_network(&mut self) -> Result<(), ProtocolError> {
182 self.network = Some(NetworkManager::new());
185 Ok(())
186 }
187
188 async fn init_dag(&mut self) -> Result<(), ProtocolError> {
190 self.dag = Some(QrDag::new());
193 Ok(())
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[tokio::test]
202 async fn test_coordinator_lifecycle() {
203 let config = ProtocolConfig::default();
204 let mut coordinator = Coordinator::new(config).await.unwrap();
205
206 assert_eq!(coordinator.state().await, ProtocolState::Initial);
207
208 coordinator.start().await.unwrap();
209 assert_eq!(coordinator.state().await, ProtocolState::Running);
210
211 coordinator.stop().await.unwrap();
212 assert_eq!(coordinator.state().await, ProtocolState::Stopped);
213 }
214
215 #[tokio::test]
216 async fn test_coordinator_initialization() {
217 let config = ProtocolConfig::default();
218 let coordinator = Coordinator::new(config).await.unwrap();
219
220 assert!(!coordinator.is_initialized());
222 }
223}