use std::sync::{atomic::AtomicUsize, Arc};
use chrono::Utc;
use dashmap::DashMap;
use futures_util::{stream::FuturesUnordered, StreamExt};
use lazysort::SortedBy;
use snops_common::{
aot_cmds::Authorization,
state::{CannonId, EnvId, NetworkId},
};
use tracing::{error, trace, warn};
use super::{
error::{CannonError, ExecutionContextError, SourceError},
file::TransactionSink,
sink::TxSink,
source::TxSource,
status::{TransactionSendState, TransactionStatusEvent, TransactionStatusSender},
tracker::TransactionTracker,
CannonReceivers,
};
use crate::{
cannon::source::ComputeTarget,
state::{GlobalState, REST_CLIENT},
};
pub struct ExecutionContext {
pub(crate) state: Arc<GlobalState>,
pub(crate) id: CannonId,
pub(crate) env_id: EnvId,
pub(crate) network: NetworkId,
pub(crate) source: TxSource,
pub(crate) sink: TxSink,
pub(crate) fired_txs: Arc<AtomicUsize>,
pub(crate) transactions: Arc<DashMap<String, TransactionTracker>>,
}
impl ExecutionContext {
pub async fn spawn(self, mut rx: CannonReceivers) -> Result<(), CannonError> {
let ExecutionContext {
id: cannon_id,
env_id,
source,
sink,
fired_txs,
state,
..
} = &self;
let env_id = *env_id;
let env = state
.get_env(env_id)
.ok_or_else(|| ExecutionContextError::EnvDropped(env_id, *cannon_id))?;
trace!("cannon {env_id}.{cannon_id} spawned");
let suffix = format!("/api/v1/env/{}/cannons/{cannon_id}", env.id);
let query_path = match source.compute {
ComputeTarget::Agent { .. } => suffix,
ComputeTarget::Demox { .. } => {
let host = state
.cli
.hostname
.as_ref()
.ok_or(ExecutionContextError::NoHostnameConfigured)?;
format!("{host}:{}{suffix}", state.cli.port)
}
};
trace!("cannon {env_id}.{cannon_id} using realtime query {query_path}");
let sink_pipe = sink
.file_name
.map(|file_name| {
env.sinks.get(&file_name).cloned().ok_or_else(|| {
ExecutionContextError::TransactionSinkNotFound(env_id, *cannon_id, file_name)
})
})
.transpose()?;
let mut auth_execs = FuturesUnordered::new();
let mut tx_shots = FuturesUnordered::new();
loop {
tokio::select! {
Some((tx_id, events)) = rx.authorizations.recv() => {
let Some(tracker) = self.transactions.get(&tx_id) else {
error!("cannon {env_id}.{cannon_id} missing transaction tracker for {tx_id}");
events.send(TransactionStatusEvent::ExecuteAborted);
continue;
};
if tracker.status != TransactionSendState::Authorized {
error!("cannon {env_id}.{cannon_id} unexpected status for {tx_id}: {:?}", tracker.status);
events.send(TransactionStatusEvent::ExecuteAborted);
continue;
}
let Some(auth) = &tracker.authorization else {
error!("cannon {env_id}.{cannon_id} missing authorization for {tx_id}");
events.send(TransactionStatusEvent::ExecuteAborted);
continue;
};
auth_execs.push(self.execute_auth(tx_id, Arc::clone(auth), &query_path, events));
}
Some(tx) = rx.transactions.recv() => {
tx_shots.push(self.fire_tx(sink_pipe.clone(), tx));
}
Some(res) = auth_execs.next() => {
if let Err((tx_id, e)) = res {
warn!("cannon {env_id}.{cannon_id} auth execute task {tx_id} failed: {e}");
}
},
Some(res) = tx_shots.next() => {
match res {
Ok(tx_id) => {
let _fired_count = fired_txs.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
trace!("cannon {env_id}.{cannon_id} broadcasted {tx_id}");
}
Err(e) => {
warn!("cannon {env_id}.{cannon_id} failed to fire transaction {e}");
}
}
},
}
}
}
pub fn write_tx_status(&self, tx_id: &str, status: TransactionSendState) {
let key = (self.env_id, self.id, tx_id.to_owned());
if let Some(mut tx) = self.transactions.get_mut(tx_id) {
if let Err(e) = TransactionTracker::write_status(&self.state, &key, status) {
error!(
"cannon {}.{} failed to write status for {tx_id}: {e}",
self.env_id, self.id
);
}
tx.status = status;
}
}
pub fn remove_tx_tracker(&self, tx_id: String) {
let _ = self.transactions.remove(&tx_id);
if let Err(e) =
TransactionTracker::delete(&self.state, &(self.env_id, self.id, tx_id.to_owned()))
{
error!(
"cannon {}.{} failed to delete transaction {tx_id}: {e:?}",
self.env_id, self.id
);
}
}
async fn execute_auth(
&self,
tx_id: String,
auth: Arc<Authorization>,
query_path: &str,
events: TransactionStatusSender,
) -> Result<(), (String, CannonError)> {
events.send(TransactionStatusEvent::ExecuteQueued);
match self
.source
.compute
.execute(self, query_path, &tx_id, &auth, &events)
.await
{
Err(CannonError::Source(SourceError::NoAvailableAgents(_))) => {
events.send(TransactionStatusEvent::ExecuteAwaitingCompute);
Ok(())
}
Err(e) => {
self.write_tx_status(&tx_id, TransactionSendState::Authorized);
events.send(TransactionStatusEvent::ExecuteFailed(e.to_string()));
Err((tx_id, e))
}
res => res.map_err(|e| (tx_id, e)),
}
}
async fn fire_tx(
&self,
sink_pipe: Option<Arc<TransactionSink>>,
tx_id: String,
) -> Result<String, CannonError> {
let latest_height = self
.state
.get_env_block_info(self.env_id)
.map(|info| info.height);
let Some(tracker) = self.transactions.get(&tx_id).map(|v| v.value().clone()) else {
return Err(CannonError::TransactionLost(self.id, tx_id));
};
if !matches!(
tracker.status,
TransactionSendState::Unsent | TransactionSendState::Broadcasted(_, _)
) {
return Err(CannonError::InvalidTransactionState(
self.id,
tx_id,
format!(
"expected unsent or broadcasted, got {}",
tracker.status.label()
),
));
}
let Some(tx_blob) = tracker.transaction else {
return Err(CannonError::TransactionLost(self.id, tx_id));
};
let tx_str = match serde_json::to_string(&tx_blob) {
Ok(tx_str) => tx_str,
Err(e) => {
return Err(CannonError::Source(SourceError::Json(
"serialize tx for broadcast",
e,
)));
}
};
if let Some(pipe) = sink_pipe {
pipe.write(&tx_str)?;
}
let cannon_id = self.id;
let env_id = self.env_id;
if let Some(target) = &self.sink.target {
let broadcast_nodes = self.state.get_scored_peers(env_id, target);
if broadcast_nodes.is_empty() {
return Err(ExecutionContextError::NoAvailableAgents(
env_id,
cannon_id,
"to broadcast transactions",
)
.into());
}
let network = self.network;
let update_status = || {
self.write_tx_status(
&tx_id,
TransactionSendState::Broadcasted(latest_height, Utc::now()),
);
if let Err(e) = TransactionTracker::inc_attempts(
&self.state,
&(env_id, cannon_id, tx_id.to_owned()),
) {
error!(
"cannon {env_id}.{cannon_id} failed to increment broadcast attempts for {tx_id}: {e}",
);
}
};
for (_, _, agent, addr) in broadcast_nodes.into_iter().sorted_by(|a, b| a.0.cmp(&b.0)) {
if let Some(id) = agent {
let Some(client) = self.state.get_client(id) else {
continue;
};
if let Err(e) = client.broadcast_tx(tx_str.clone()).await {
warn!("cannon {env_id}.{cannon_id} failed to broadcast transaction to agent {id}: {e}");
continue;
}
update_status();
return Ok(tx_id);
}
if let Some(addr) = addr {
let url = format!("http://{addr}/{network}/transaction/broadcast");
let req = REST_CLIENT
.post(url)
.header("Content-Type", "application/json")
.body(tx_str.clone())
.send();
let Ok(res) =
tokio::time::timeout(std::time::Duration::from_secs(5), req).await
else {
warn!("cannon {env_id}.{cannon_id} failed to broadcast transaction to {addr}: timeout");
continue;
};
match res {
Err(e) => {
warn!("cannon {env_id}.{cannon_id} failed to broadcast transaction to {addr}: {e}");
continue;
}
Ok(req) => {
let status = req.status();
if !status.is_success() {
if status.is_server_error()
&& req
.text()
.await
.ok()
.is_some_and(|text| text.contains("exists in the ledger"))
{
return Ok(tx_id);
}
warn!("cannon {env_id}.{cannon_id} failed to broadcast transaction to {addr}: {}", status);
continue;
}
}
}
update_status();
return Ok(tx_id);
}
}
Err(ExecutionContextError::NoAvailableAgents(
env_id,
cannon_id,
"to broadcast transactions",
))?
} else {
self.remove_tx_tracker(tx_id.clone());
}
Ok(tx_id)
}
}