use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use miden_protocol::account::AccountId;
use tokio::sync::{Notify, Semaphore};
use tokio::task::JoinSet;
use crate::actor::{AccountActor, AccountActorContext};
use crate::committed_block::CommittedBlockEffects;
#[derive(Clone)]
struct ActorHandle {
notify: Arc<Notify>,
}
impl ActorHandle {
fn new(notify: Arc<Notify>) -> Self {
Self { notify }
}
fn notify(&self) {
self.notify.notify_one();
}
fn has_pending_notification(&self) -> bool {
use futures::FutureExt;
if self.notify.notified().now_or_never().is_some() {
self.notify.notify_one();
true
} else {
false
}
}
}
pub struct Coordinator {
actor_registry: HashMap<AccountId, ActorHandle>,
actor_join_set: JoinSet<(AccountId, anyhow::Result<()>)>,
semaphore: Arc<Semaphore>,
actor_context: AccountActorContext,
crash_counts: HashMap<AccountId, usize>,
max_account_crashes: usize,
}
impl Coordinator {
pub fn new(
max_inflight_transactions: usize,
max_account_crashes: usize,
actor_context: AccountActorContext,
) -> Self {
Self {
actor_registry: HashMap::new(),
actor_join_set: JoinSet::new(),
semaphore: Arc::new(Semaphore::new(max_inflight_transactions)),
actor_context,
crash_counts: HashMap::new(),
max_account_crashes,
}
}
#[tracing::instrument(name = "ntx.builder.spawn_actor", skip(self))]
pub fn spawn_actor(&mut self, account_id: AccountId) {
if let Some(&count) = self.crash_counts.get(&account_id)
&& count >= self.max_account_crashes
{
tracing::warn!(
account.id = %account_id,
crash_count = count,
"Account deactivated due to repeated crashes, skipping actor spawn"
);
return;
}
if self.actor_registry.contains_key(&account_id) {
tracing::error!(
account.id = %account_id,
"Account actor already exists",
);
return;
}
let notify = Arc::new(Notify::new());
let actor = AccountActor::new(account_id, &self.actor_context, notify.clone());
let handle = ActorHandle::new(notify);
let semaphore = self.semaphore.clone();
self.actor_join_set
.spawn(Box::pin(async move { (account_id, actor.run(semaphore).await) }));
self.actor_registry.insert(account_id, handle);
tracing::info!(account.id = %account_id, "Created actor for account");
}
pub fn handle_committed_block(&mut self, effects: &CommittedBlockEffects) {
let mut targeted: HashSet<AccountId> = HashSet::new();
for note in &effects.network_notes {
targeted.insert(note.target_account_id());
}
for account_id in &targeted {
if !self.actor_registry.contains_key(account_id) {
self.spawn_actor(*account_id);
}
}
for handle in self.actor_registry.values() {
handle.notify();
}
}
pub async fn next(&mut self) -> anyhow::Result<Option<AccountId>> {
let actor_result = self.actor_join_set.join_next().await;
match actor_result {
Some(Ok((account_id, Ok(())))) => {
let should_respawn = self
.actor_registry
.remove(&account_id)
.is_some_and(|handle| handle.has_pending_notification());
Ok(should_respawn.then_some(account_id))
},
Some(Ok((account_id, Err(err)))) => {
let count = self.crash_counts.entry(account_id).or_insert(0);
*count += 1;
tracing::error!(
account.id = %account_id,
"Account actor crashed: {err:#}"
);
self.actor_registry.remove(&account_id);
Ok(None)
},
Some(Err(err)) => {
tracing::error!(err = %err, "actor task failed");
Ok(None)
},
None => {
std::future::pending().await
},
}
}
}
#[cfg(test)]
impl Coordinator {
pub async fn test()
-> (Self, tempfile::TempDir, tokio::sync::mpsc::Receiver<crate::actor::ActorRequest>) {
use crate::db::Db;
let (db, dir) = Db::test_setup().await;
let (tx, rx) = tokio::sync::mpsc::channel(8);
let mut actor_context = AccountActorContext::test(&db);
actor_context.request_tx = tx;
(Self::new(4, 10, actor_context), dir, rx)
}
}
#[cfg(test)]
mod tests {
use futures::FutureExt;
use super::*;
use crate::test_utils::*;
fn register_dummy_actor(coordinator: &mut Coordinator, account_id: AccountId) {
let notify = Arc::new(Notify::new());
coordinator.actor_registry.insert(account_id, ActorHandle::new(notify));
}
#[tokio::test]
async fn handle_committed_block_spawns_for_unknown_note_target() {
let (mut coordinator, _dir, _rx) = Coordinator::test().await;
let unknown_id = mock_network_account_id();
let note = mock_single_target_note(unknown_id, 10);
let effects = CommittedBlockEffects {
header: mock_block_header(1_u32.into()),
network_notes: vec![note],
nullifiers: vec![],
network_account_updates: vec![],
account_transactions: vec![],
};
coordinator.handle_committed_block(&effects);
assert!(
coordinator.actor_registry.contains_key(&unknown_id),
"previously-untouched account targeted by a note should get a fresh actor",
);
}
#[tokio::test]
async fn handle_committed_block_does_not_spawn_for_account_update_only() {
let (mut coordinator, _dir, _rx) = Coordinator::test().await;
let updated_id = mock_network_account_id();
let effects = CommittedBlockEffects {
header: mock_block_header(1_u32.into()),
network_notes: vec![],
nullifiers: vec![],
network_account_updates: vec![(
updated_id,
miden_protocol::account::delta::AccountUpdateDetails::Private,
)],
account_transactions: vec![],
};
coordinator.handle_committed_block(&effects);
assert!(
!coordinator.actor_registry.contains_key(&updated_id),
"an account update without a new note should not trigger an actor spawn",
);
}
#[tokio::test]
async fn spawn_actor_skips_deactivated_account() {
let (mut coordinator, _dir, _rx) = Coordinator::test().await;
let account_id = mock_network_account_id();
coordinator.crash_counts.insert(account_id, coordinator.max_account_crashes);
coordinator.spawn_actor(account_id);
assert!(
!coordinator.actor_registry.contains_key(&account_id),
"deactivated account should not have an actor in the registry",
);
}
#[tokio::test]
async fn spawn_actor_allows_below_threshold() {
let (mut coordinator, _dir, _rx) = Coordinator::test().await;
let account_id = mock_network_account_id();
coordinator
.crash_counts
.insert(account_id, coordinator.max_account_crashes.saturating_sub(1));
coordinator.spawn_actor(account_id);
assert!(
coordinator.actor_registry.contains_key(&account_id),
"account below crash threshold should have an actor in the registry",
);
}
#[tokio::test]
async fn handle_committed_block_notifies_existing_actors() {
let (mut coordinator, _dir, _rx) = Coordinator::test().await;
let bystander = mock_network_account_id();
register_dummy_actor(&mut coordinator, bystander);
let bystander_notify = coordinator.actor_registry.get(&bystander).unwrap().notify.clone();
let target = mock_network_account_id_seeded(42);
let note = mock_single_target_note(target, 10);
let effects = CommittedBlockEffects {
header: mock_block_header(1_u32.into()),
network_notes: vec![note],
nullifiers: vec![],
network_account_updates: vec![],
account_transactions: vec![],
};
coordinator.handle_committed_block(&effects);
assert!(
bystander_notify.notified().now_or_never().is_some(),
"every registered actor should be notified on a committed block",
);
assert!(
coordinator.actor_registry.contains_key(&target),
"freshly-targeted account should get an actor",
);
}
}