use crate::{
errors::{IndexError, IndexResult},
IDENT,
};
use async_trait::async_trait;
use futures::{
future::FutureExt, select,
};
use kaspa_consensus_notify::{notification as consensus_notification, notification::Notification as ConsensusNotification};
use kaspa_core::trace;
use kaspa_index_core::notification::{Notification, PruningPointUtxoSetOverrideNotification, UtxosChangedNotification};
use kaspa_notify::{
collector::{Collector, CollectorNotificationReceiver},
error::{Error, Result},
events::EventType,
notification::Notification as NotificationTrait,
notifier::DynNotify,
};
use kaspa_utils::triggers::DuplexTrigger;
use kaspa_utxoindex::api::DynUtxoIndexApi;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
#[derive(Debug)]
pub struct Processor {
utxoindex: DynUtxoIndexApi,
recv_channel: CollectorNotificationReceiver<ConsensusNotification>,
is_started: Arc<AtomicBool>,
collect_shutdown: Arc<DuplexTrigger>,
}
impl Processor {
pub fn new(utxoindex: DynUtxoIndexApi, recv_channel: CollectorNotificationReceiver<ConsensusNotification>) -> Self {
Self {
utxoindex,
recv_channel,
collect_shutdown: Arc::new(DuplexTrigger::new()),
is_started: Arc::new(AtomicBool::new(false)),
}
}
fn spawn_collecting_task(self: Arc<Self>, notifier: DynNotify<Notification>) {
if self.is_started.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst).is_err() {
return;
}
tokio::spawn(async move {
trace!("[Processor] collecting_task start");
loop {
select! {
_ = self.collect_shutdown.request.listener.clone().fuse() => break,
notification = self.recv_channel.recv().fuse() => {
match notification {
Ok(notification) => {
match self.process_notification(notification){
Ok(notification) => {
match notifier.notify(notification) {
Ok(_) => (),
Err(err) => {
trace!("[Processor] notification sender error: {err:?}");
},
}
},
Err(err) => {
trace!("[Processor] error while processing a consensus notification: {err:?}");
}
}
},
Err(err) => {
trace!("[Processor] error while receiving a consensus notification: {err:?}");
}
}
}
}
}
self.collect_shutdown.response.trigger.trigger();
trace!("[Processor] collecting_task end");
});
}
fn process_notification(self: &Arc<Self>, notification: ConsensusNotification) -> IndexResult<Notification> {
match notification {
ConsensusNotification::UtxosChanged(utxos_changed) => {
Ok(Notification::UtxosChanged(self.process_utxos_changed(utxos_changed)?))
}
ConsensusNotification::PruningPointUtxoSetOverride(_) => {
Ok(Notification::PruningPointUtxoSetOverride(PruningPointUtxoSetOverrideNotification {}))
}
_ => Err(IndexError::NotSupported(notification.event_type())),
}
}
fn process_utxos_changed(
self: &Arc<Self>,
notification: consensus_notification::UtxosChangedNotification,
) -> IndexResult<UtxosChangedNotification> {
trace!("[{IDENT}]: processing {:?}", notification);
if let Some(utxoindex) = self.utxoindex.as_deref() {
return Ok(utxoindex.write().update(notification.accumulated_utxo_diff.clone(), notification.virtual_parents)?.into());
};
Err(IndexError::NotSupported(EventType::UtxosChanged))
}
async fn stop_collecting_task(&self) -> Result<()> {
if self.is_started.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst).is_err() {
return Err(Error::AlreadyStoppedError);
}
self.collect_shutdown.request.trigger.trigger();
self.collect_shutdown.response.listener.clone().await;
Ok(())
}
}
#[async_trait]
impl Collector<Notification> for Processor {
fn start(self: Arc<Self>, notifier: DynNotify<Notification>) {
self.spawn_collecting_task(notifier);
}
async fn stop(self: Arc<Self>) -> Result<()> {
self.stop_collecting_task().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_channel::{unbounded, Receiver, Sender};
use kaspa_consensus::{
config::Config,
consensus::test_consensus::{create_temp_db, TempDbLifetime, TestConsensus},
params::DEVNET_PARAMS,
test_helpers::*,
};
use kaspa_consensus_core::utxo::{utxo_collection::UtxoCollection, utxo_diff::UtxoDiff};
use kaspa_consensusmanager::ConsensusManager;
use kaspa_notify::notifier::test_helpers::NotifyMock;
use kaspa_utxoindex::{api::DynUtxoIndexApi, UtxoIndex};
use rand::{rngs::SmallRng, SeedableRng};
use std::sync::Arc;
#[allow(dead_code)]
struct NotifyPipeline {
consensus_sender: Sender<ConsensusNotification>,
processor: Arc<Processor>,
processor_receiver: Receiver<Notification>,
test_consensus: TestConsensus,
utxoindex_db_lifetime: TempDbLifetime,
}
impl NotifyPipeline {
fn new() -> Self {
let (consensus_sender, consensus_receiver) = unbounded();
let (utxoindex_db_lifetime, utxoindex_db) = create_temp_db();
let config = Arc::new(Config::new(DEVNET_PARAMS));
let tc = TestConsensus::create_from_temp_db_and_dummy_sender(&config);
tc.init();
let consensus_manager = Arc::new(ConsensusManager::from_consensus(tc.consensus()));
let utxoindex: DynUtxoIndexApi = Some(UtxoIndex::new(consensus_manager, utxoindex_db).unwrap());
let processor = Arc::new(Processor::new(utxoindex, consensus_receiver));
let (processor_sender, processor_receiver) = unbounded();
let notifier = Arc::new(NotifyMock::new(processor_sender));
processor.clone().start(notifier);
Self { test_consensus: tc, consensus_sender, processor, processor_receiver, utxoindex_db_lifetime }
}
}
#[tokio::test]
async fn test_utxos_changed_notification() {
let pipeline = NotifyPipeline::new();
let rng = &mut SmallRng::seed_from_u64(42);
let mut to_add_collection = UtxoCollection::new();
let mut to_remove_collection = UtxoCollection::new();
for _ in 0..2 {
to_add_collection.insert(generate_random_outpoint(rng), generate_random_utxo(rng));
to_remove_collection.insert(generate_random_outpoint(rng), generate_random_utxo(rng));
}
let test_notification = consensus_notification::UtxosChangedNotification::new(
Arc::new(UtxoDiff { add: to_add_collection, remove: to_remove_collection }),
Arc::new(generate_random_hashes(rng, 2)),
);
pipeline.consensus_sender.send(ConsensusNotification::UtxosChanged(test_notification.clone())).await.expect("expected send");
match pipeline.processor_receiver.recv().await.expect("receives a notification") {
Notification::UtxosChanged(utxo_changed_notification) => {
let mut notification_utxo_added_count = 0;
for (script_public_key, compact_utxo_collection) in utxo_changed_notification.added.iter() {
for (transaction_outpoint, compact_utxo) in compact_utxo_collection.iter() {
let test_utxo = test_notification
.accumulated_utxo_diff
.add
.get(transaction_outpoint)
.expect("expected transaction outpoint to be in test event");
assert_eq!(test_utxo.script_public_key, *script_public_key);
assert_eq!(test_utxo.amount, compact_utxo.amount);
assert_eq!(test_utxo.block_daa_score, compact_utxo.block_daa_score);
assert_eq!(test_utxo.is_coinbase, compact_utxo.is_coinbase);
notification_utxo_added_count += 1;
}
}
assert_eq!(test_notification.accumulated_utxo_diff.add.len(), notification_utxo_added_count);
let mut notification_utxo_removed_count = 0;
for (script_public_key, compact_utxo_collection) in utxo_changed_notification.removed.iter() {
for (transaction_outpoint, compact_utxo) in compact_utxo_collection.iter() {
let test_utxo = test_notification
.accumulated_utxo_diff
.remove
.get(transaction_outpoint)
.expect("expected transaction outpoint to be in test event");
assert_eq!(test_utxo.script_public_key, *script_public_key);
assert_eq!(test_utxo.amount, compact_utxo.amount);
assert_eq!(test_utxo.block_daa_score, compact_utxo.block_daa_score);
assert_eq!(test_utxo.is_coinbase, compact_utxo.is_coinbase);
notification_utxo_removed_count += 1;
}
}
assert_eq!(test_notification.accumulated_utxo_diff.remove.len(), notification_utxo_removed_count);
}
unexpected_notification => panic!("Unexpected notification: {unexpected_notification:?}"),
}
assert!(pipeline.processor_receiver.is_empty(), "the notification receiver should be empty");
pipeline.processor.clone().stop().await.expect("stopping the processor must succeed");
}
#[tokio::test]
async fn test_pruning_point_utxo_set_override_notification() {
let pipeline = NotifyPipeline::new();
let test_notification = consensus_notification::PruningPointUtxoSetOverrideNotification {};
pipeline
.consensus_sender
.send(ConsensusNotification::PruningPointUtxoSetOverride(test_notification.clone()))
.await
.expect("expected send");
match pipeline.processor_receiver.recv().await.expect("expected recv") {
Notification::PruningPointUtxoSetOverride(_) => (),
unexpected_notification => panic!("Unexpected notification: {unexpected_notification:?}"),
}
assert!(pipeline.processor_receiver.is_empty(), "the notification receiver should be empty");
pipeline.processor.clone().stop().await.expect("stopping the processor must succeed");
}
}