qudag_protocol/
coordinator.rs

1//! Protocol coordinator implementation.
2//!
3//! The coordinator provides a high-level interface for managing protocol operations,
4//! integrating crypto, network, and DAG components.
5
6use crate::{
7    config::Config as ProtocolConfig,
8    message::Message,
9    node::{Node, NodeConfig},
10    types::{ProtocolError, ProtocolEvent, ProtocolState},
11};
12use qudag_crypto::KeyPair;
13use qudag_dag::QrDag;
14use qudag_network::NetworkManager;
15use std::sync::Arc;
16use tokio::sync::{mpsc, RwLock};
17use tracing::{debug, info};
18
19/// Protocol coordinator
20pub struct Coordinator {
21    /// Internal node instance
22    node: Node,
23    /// Protocol configuration  
24    #[allow(dead_code)]
25    config: ProtocolConfig,
26    /// Current state
27    state: Arc<RwLock<ProtocolState>>,
28    /// Event channels
29    #[allow(dead_code)]
30    events: CoordinatorEvents,
31    /// Crypto manager
32    crypto: Option<KeyPair>,
33    /// Network manager
34    network: Option<NetworkManager>,
35    /// DAG manager
36    dag: Option<QrDag>,
37}
38
39/// Coordinator event channels
40struct CoordinatorEvents {
41    /// Event sender
42    #[allow(dead_code)]
43    tx: mpsc::Sender<ProtocolEvent>,
44    /// Event receiver  
45    #[allow(dead_code)]
46    rx: mpsc::Receiver<ProtocolEvent>,
47}
48
49impl Coordinator {
50    /// Create new coordinator
51    pub async fn new(config: ProtocolConfig) -> Result<Self, ProtocolError> {
52        let node_config = NodeConfig {
53            data_dir: config.node.data_dir.clone(),
54            network_port: config.network.port,
55            max_peers: config.network.max_peers,
56            initial_peers: Vec::new(),
57        };
58
59        let node = Node::new(node_config).await?;
60        let (tx, rx) = mpsc::channel(1000);
61
62        Ok(Self {
63            node,
64            config,
65            state: Arc::new(RwLock::new(ProtocolState::Initial)),
66            events: CoordinatorEvents { tx, rx },
67            crypto: None,
68            network: None,
69            dag: None,
70        })
71    }
72
73    /// Start coordinator
74    pub async fn start(&mut self) -> Result<(), ProtocolError> {
75        info!("Starting protocol coordinator...");
76
77        // Initialize components
78        self.init_crypto().await?;
79        self.init_network().await?;
80        self.init_dag().await?;
81
82        // Start internal node
83        self.node.start().await?;
84
85        // Update state
86        {
87            let mut state = self.state.write().await;
88            *state = ProtocolState::Running;
89        }
90
91        info!("Protocol coordinator started successfully");
92        Ok(())
93    }
94
95    /// Stop coordinator
96    pub async fn stop(&mut self) -> Result<(), ProtocolError> {
97        info!("Stopping protocol coordinator...");
98
99        // Update state
100        {
101            let mut state = self.state.write().await;
102            *state = ProtocolState::Stopping;
103        }
104
105        // Stop internal node
106        self.node.stop().await?;
107
108        // Update state
109        {
110            let mut state = self.state.write().await;
111            *state = ProtocolState::Stopped;
112        }
113
114        info!("Protocol coordinator stopped successfully");
115        Ok(())
116    }
117
118    /// Get current state
119    pub async fn state(&self) -> ProtocolState {
120        self.state.read().await.clone()
121    }
122
123    /// Check if coordinator is initialized
124    pub fn is_initialized(&self) -> bool {
125        self.crypto.is_some() && self.network.is_some() && self.dag.is_some()
126    }
127
128    /// Broadcast message
129    pub async fn broadcast_message(&mut self, message: Vec<u8>) -> Result<(), ProtocolError> {
130        debug!("Broadcasting message of {} bytes", message.len());
131
132        // Create protocol message
133        let proto_message =
134            Message::new(crate::message::MessageType::Data(message.clone()), vec![]);
135
136        // Sign message if crypto is available
137        if let Some(ref _crypto) = self.crypto {
138            // TODO: Use proper keypair for signing
139            // proto_message.sign(&proper_keypair).map_err(|e| ProtocolError::CryptoError(e.to_string()))?;
140        }
141
142        // Handle message through node
143        self.node
144            .handle_message(proto_message)
145            .await
146            .map_err(|e| ProtocolError::Internal(e.to_string()))?;
147
148        // Add to DAG if available
149        if let Some(ref mut dag) = self.dag {
150            dag.add_message(message)
151                .map_err(|e| ProtocolError::Internal(e.to_string()))?;
152        }
153
154        Ok(())
155    }
156
157    /// Get crypto manager
158    pub fn crypto_manager(&self) -> Option<&KeyPair> {
159        self.crypto.as_ref()
160    }
161
162    /// Get network manager
163    pub fn network_manager(&self) -> Option<&NetworkManager> {
164        self.network.as_ref()
165    }
166
167    /// Get DAG manager
168    pub fn dag_manager(&self) -> Option<&QrDag> {
169        self.dag.as_ref()
170    }
171
172    // Initialize crypto components
173    async fn init_crypto(&mut self) -> Result<(), ProtocolError> {
174        // TODO: Initialize proper crypto manager
175        // For now, create a placeholder
176        self.crypto = Some(KeyPair::new());
177        Ok(())
178    }
179
180    // Initialize network components
181    async fn init_network(&mut self) -> Result<(), ProtocolError> {
182        // TODO: Initialize proper network manager
183        // For now, create a placeholder
184        self.network = Some(NetworkManager::new());
185        Ok(())
186    }
187
188    // Initialize DAG components
189    async fn init_dag(&mut self) -> Result<(), ProtocolError> {
190        // TODO: Initialize proper DAG manager
191        // For now, create a placeholder
192        self.dag = Some(QrDag::new());
193        Ok(())
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[tokio::test]
202    async fn test_coordinator_lifecycle() {
203        let config = ProtocolConfig::default();
204        let mut coordinator = Coordinator::new(config).await.unwrap();
205
206        assert_eq!(coordinator.state().await, ProtocolState::Initial);
207
208        coordinator.start().await.unwrap();
209        assert_eq!(coordinator.state().await, ProtocolState::Running);
210
211        coordinator.stop().await.unwrap();
212        assert_eq!(coordinator.state().await, ProtocolState::Stopped);
213    }
214
215    #[tokio::test]
216    async fn test_coordinator_initialization() {
217        let config = ProtocolConfig::default();
218        let coordinator = Coordinator::new(config).await.unwrap();
219
220        // Initially not initialized
221        assert!(!coordinator.is_initialized());
222    }
223}