use std::time::Duration;
use log::*;
use tari_comms::{
Substream,
connectivity::ConnectivityRequester,
protocol::{ProtocolExtension, ProtocolExtensionContext, ProtocolExtensionError, ProtocolNotification},
};
use tari_service_framework::{ServiceInitializationError, ServiceInitializer, ServiceInitializerContext, async_trait};
use tokio::{sync::mpsc, time::sleep};
use crate::{
base_node::{StateMachineHandle, comms_interface::LocalNodeCommsInterface},
mempool::{
Mempool,
MempoolServiceConfig,
sync_protocol::{MEMPOOL_SYNC_PROTOCOL, MempoolSyncProtocol},
},
};
const LOG_TARGET: &str = "c::mempool::sync_protocol";
pub struct MempoolSyncInitializer {
config: MempoolServiceConfig,
mempool: Mempool,
notif_rx: Option<mpsc::Receiver<ProtocolNotification<Substream>>>,
notif_tx: mpsc::Sender<ProtocolNotification<Substream>>,
}
impl MempoolSyncInitializer {
pub fn new(config: MempoolServiceConfig, mempool: Mempool) -> Self {
let (notif_tx, notif_rx) = mpsc::channel(3);
Self {
mempool,
config,
notif_tx,
notif_rx: Some(notif_rx),
}
}
pub fn get_protocol_extension(&self) -> impl ProtocolExtension + use<> {
let notif_tx = self.notif_tx.clone();
move |context: &mut ProtocolExtensionContext| -> Result<(), ProtocolExtensionError> {
context.add_protocol([MEMPOOL_SYNC_PROTOCOL.clone()], ¬if_tx);
Ok(())
}
}
}
#[async_trait]
impl ServiceInitializer for MempoolSyncInitializer {
async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
trace!(target: LOG_TARGET, "Initializing Mempool Sync Service");
let config = self.config.clone();
let mempool = self.mempool.clone();
let notif_rx = self.notif_rx.take().unwrap();
context.spawn_until_shutdown(move |handles| async move {
let state_machine = handles.expect_handle::<StateMachineHandle>();
let connectivity = handles.expect_handle::<ConnectivityRequester>();
let base_node = handles.expect_handle::<LocalNodeCommsInterface>();
let mut status_watch = state_machine.get_status_info_watch();
if !status_watch.borrow().state_info.is_synced() {
debug!(target: LOG_TARGET, "Waiting for node to do initial sync...");
while status_watch.changed().await.is_ok() {
if status_watch.borrow().state_info.is_synced() {
debug!(
target: LOG_TARGET,
"Initial sync is done. Starting mempool sync protocol"
);
break;
}
trace!(
target: LOG_TARGET,
"Mempool sync still on hold, waiting for node to do initial sync",
);
sleep(Duration::from_secs(30)).await;
}
}
let base_node_events = base_node.get_block_event_stream();
MempoolSyncProtocol::new(config, notif_rx, mempool, connectivity, base_node_events)
.run()
.await;
});
trace!(target: LOG_TARGET, "Mempool sync service initialized");
Ok(())
}
}