use crate::{
config::Config as ProtocolConfig,
message::Message,
node::{Node, NodeConfig},
types::{ProtocolError, ProtocolEvent, ProtocolState},
};
use qudag_crypto::KeyPair;
use qudag_dag::QrDag;
use qudag_network::NetworkManager;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info};
pub struct Coordinator {
node: Node,
#[allow(dead_code)]
config: ProtocolConfig,
state: Arc<RwLock<ProtocolState>>,
#[allow(dead_code)]
events: CoordinatorEvents,
crypto: Option<KeyPair>,
network: Option<NetworkManager>,
dag: Option<QrDag>,
}
struct CoordinatorEvents {
#[allow(dead_code)]
tx: mpsc::Sender<ProtocolEvent>,
#[allow(dead_code)]
rx: mpsc::Receiver<ProtocolEvent>,
}
impl Coordinator {
pub async fn new(config: ProtocolConfig) -> Result<Self, ProtocolError> {
let node_config = NodeConfig {
data_dir: config.node.data_dir.clone(),
network_port: config.network.port,
max_peers: config.network.max_peers,
initial_peers: Vec::new(),
};
let node = Node::new(node_config).await?;
let (tx, rx) = mpsc::channel(1000);
Ok(Self {
node,
config,
state: Arc::new(RwLock::new(ProtocolState::Initial)),
events: CoordinatorEvents { tx, rx },
crypto: None,
network: None,
dag: None,
})
}
pub async fn start(&mut self) -> Result<(), ProtocolError> {
info!("Starting protocol coordinator...");
self.init_crypto().await?;
self.init_network().await?;
self.init_dag().await?;
self.node.start().await?;
{
let mut state = self.state.write().await;
*state = ProtocolState::Running;
}
info!("Protocol coordinator started successfully");
Ok(())
}
pub async fn stop(&mut self) -> Result<(), ProtocolError> {
info!("Stopping protocol coordinator...");
{
let mut state = self.state.write().await;
*state = ProtocolState::Stopping;
}
self.node.stop().await?;
{
let mut state = self.state.write().await;
*state = ProtocolState::Stopped;
}
info!("Protocol coordinator stopped successfully");
Ok(())
}
pub async fn state(&self) -> ProtocolState {
self.state.read().await.clone()
}
pub fn is_initialized(&self) -> bool {
self.crypto.is_some() && self.network.is_some() && self.dag.is_some()
}
pub async fn broadcast_message(&mut self, message: Vec<u8>) -> Result<(), ProtocolError> {
debug!("Broadcasting message of {} bytes", message.len());
let proto_message =
Message::new(crate::message::MessageType::Data(message.clone()), vec![]);
if let Some(ref _crypto) = self.crypto {
}
self.node
.handle_message(proto_message)
.await
.map_err(|e| ProtocolError::Internal(e.to_string()))?;
if let Some(ref mut dag) = self.dag {
dag.add_message(message)
.map_err(|e| ProtocolError::Internal(e.to_string()))?;
}
Ok(())
}
pub fn crypto_manager(&self) -> Option<&KeyPair> {
self.crypto.as_ref()
}
pub fn network_manager(&self) -> Option<&NetworkManager> {
self.network.as_ref()
}
pub fn dag_manager(&self) -> Option<&QrDag> {
self.dag.as_ref()
}
async fn init_crypto(&mut self) -> Result<(), ProtocolError> {
self.crypto = Some(KeyPair::new());
Ok(())
}
async fn init_network(&mut self) -> Result<(), ProtocolError> {
self.network = Some(NetworkManager::new());
Ok(())
}
async fn init_dag(&mut self) -> Result<(), ProtocolError> {
self.dag = Some(QrDag::new());
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_coordinator_lifecycle() {
let config = ProtocolConfig::default();
let mut coordinator = Coordinator::new(config).await.unwrap();
assert_eq!(coordinator.state().await, ProtocolState::Initial);
coordinator.start().await.unwrap();
assert_eq!(coordinator.state().await, ProtocolState::Running);
coordinator.stop().await.unwrap();
assert_eq!(coordinator.state().await, ProtocolState::Stopped);
}
#[tokio::test]
async fn test_coordinator_initialization() {
let config = ProtocolConfig::default();
let coordinator = Coordinator::new(config).await.unwrap();
assert!(!coordinator.is_initialized());
}
}