use std::sync::Arc;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use snops_common::{
aot_cmds::Authorization, lasso::Spur, node_targets::NodeTargets, state::NetworkId, INTERN,
};
use tracing::error;
use super::{
error::{CannonError, SourceError},
net::get_available_port,
status::{TransactionSendState, TransactionStatusEvent, TransactionStatusSender},
tracker::TransactionTracker,
ExecutionContext,
};
use crate::env::set::find_compute_agent;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LocalService {
#[serde(skip_serializing_if = "Option::is_none")]
pub sync_from: Option<NodeTargets>,
}
impl LocalService {
pub async fn get_state_root(
&self,
network: NetworkId,
port: u16,
) -> Result<String, CannonError> {
let url = format!("http://127.0.0.1:{port}/{network}/latest/stateRoot");
let response = reqwest::get(&url)
.await
.map_err(|e| SourceError::FailedToGetStateRoot(url, e))?;
Ok(response
.json()
.await
.map_err(SourceError::StateRootInvalidJson)?)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", untagged)]
pub enum QueryTarget {
Node(NodeTargets),
Local(LocalService),
}
impl Default for QueryTarget {
fn default() -> Self {
QueryTarget::Local(LocalService { sync_from: None })
}
}
fn deser_labels<'de, D>(deser: D) -> Result<Option<Vec<Spur>>, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(Option::<Vec<String>>::deserialize(deser)?.map(|s| {
s.into_iter()
.map(|s| INTERN.get_or_intern(s))
.collect::<Vec<Spur>>()
}))
}
fn ser_labels<S>(labels: &Option<Vec<Spur>>, ser: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match labels {
Some(labels) => {
let labels = labels
.iter()
.map(|s| INTERN.resolve(s))
.collect::<Vec<&str>>();
serde::Serialize::serialize(&labels, ser)
}
None => serde::Serialize::serialize(&None::<String>, ser),
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case", untagged)]
pub enum ComputeTarget {
Agent {
#[serde(
default,
deserialize_with = "deser_labels",
serialize_with = "ser_labels",
skip_serializing_if = "Option::is_none"
)]
labels: Option<Vec<Spur>>,
},
#[serde(rename_all = "kebab-case")]
Demox { demox_api: String },
}
impl Default for ComputeTarget {
fn default() -> Self {
ComputeTarget::Agent { labels: None }
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub struct TxSource {
#[serde(default)]
pub query: QueryTarget,
#[serde(default)]
pub compute: ComputeTarget,
}
impl TxSource {
pub fn get_query_port(&self) -> Result<Option<u16>, CannonError> {
if !matches!(self.query, QueryTarget::Local(_)) {
return Ok(None);
}
Ok(Some(
get_available_port().ok_or(SourceError::TxSourceUnavailablePort)?,
))
}
}
impl ComputeTarget {
pub async fn execute(
&self,
ctx: &ExecutionContext,
query_path: &str,
tx_id: &str,
auth: &Authorization,
events: &TransactionStatusSender,
) -> Result<(), CannonError> {
match self {
ComputeTarget::Agent { labels } => {
let (agent_id, client, _busy) =
find_compute_agent(&ctx.state, &labels.clone().unwrap_or_default())
.ok_or(SourceError::NoAvailableAgents("authorization"))?;
events.send(TransactionStatusEvent::Executing(agent_id));
ctx.write_tx_status(tx_id, TransactionSendState::Executing(Utc::now()));
if let Err(e) = TransactionTracker::inc_attempts(
&ctx.state,
&(ctx.env_id, ctx.id, tx_id.to_owned()),
) {
error!(
"cannon {}.{} failed to increment auth attempts for {tx_id}: {e}",
ctx.env_id, ctx.id
);
}
let transaction_json = client
.execute_authorization(
ctx.env_id,
ctx.network,
query_path.to_owned(),
serde_json::to_string(&auth)
.map_err(|e| SourceError::Json("authorize tx", e))?,
)
.await?;
let transaction = match serde_json::from_str::<Arc<Value>>(&transaction_json) {
Ok(transaction) => transaction,
Err(e) => {
events.send(TransactionStatusEvent::ExecuteFailed(format!(
"failed to parse transaction JSON: {transaction_json}",
)));
return Err(CannonError::Source(SourceError::Json(
"parse compute tx",
e,
)));
}
};
let key = (ctx.env_id, ctx.id, tx_id.to_owned());
if let Some(mut tx) = ctx.transactions.get_mut(tx_id) {
if let Err(e) = TransactionTracker::write_status(
&ctx.state,
&key,
TransactionSendState::Unsent,
) {
error!(
"cannon {}.{} failed to write status after auth for {tx_id}: {e}",
ctx.env_id, ctx.id
);
}
if let Err(e) = TransactionTracker::write_tx(&ctx.state, &key, &transaction) {
error!(
"cannon {}.{} failed to write tx json after auth for {tx_id}: {e}",
ctx.env_id, ctx.id
);
}
if let Err(e) = TransactionTracker::clear_attempts(
&ctx.state,
&(ctx.env_id, ctx.id, tx_id.to_owned()),
) {
tracing::error!(
"cannon {}.{} failed to clear auth attempts for {tx_id}: {e}",
ctx.env_id,
ctx.id
);
}
tx.status = TransactionSendState::Unsent;
tx.transaction = Some(Arc::clone(&transaction));
}
events.send(TransactionStatusEvent::ExecuteComplete(transaction));
Ok(())
}
ComputeTarget::Demox { demox_api: url } => match auth {
Authorization::Program { auth, fee_auth } => {
let _body = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "generateTransaction",
"params": {
"authorization": serde_json::to_string(&auth).map_err(|e| SourceError::Json("authorize tx", e))?,
"fee": serde_json::to_string(&fee_auth).map_err(|e| SourceError::Json("authorize fee", e))?,
"url": query_path,
"broadcast": true,
}
});
todo!("post on {url}")
}
Authorization::Deploy {
owner: _,
deployment: _,
fee_auth: _,
} => {
unimplemented!()
}
},
}
}
}