use std::sync::Arc;
use async_trait::async_trait;
use crate::error::WalletError;
use crate::monitor::helpers::attempt_to_post_reqs_to_network;
use crate::monitor::{AsyncCallback, ONE_MINUTE, ONE_SECOND};
use crate::services::traits::WalletServices;
use crate::status::ProvenTxReqStatus;
use crate::storage::find_args::{FindProvenTxReqsArgs, Paged, ProvenTxReqPartial};
use crate::storage::manager::WalletStorageManager;
use crate::types::Chain;
use super::super::task_trait::WalletMonitorTask;
pub struct TaskSendWaiting {
storage: Arc<WalletStorageManager>,
services: Arc<dyn WalletServices>,
trigger_msecs: u64,
aged_msecs: u64,
sending_msecs: u64,
last_run_msecs: u64,
last_sending_run_msecs: Option<u64>,
include_sending: bool,
on_tx_broadcasted: Option<AsyncCallback<String>>,
_chain: Chain,
}
impl TaskSendWaiting {
pub fn new(
storage: Arc<WalletStorageManager>,
services: Arc<dyn WalletServices>,
chain: Chain,
on_tx_broadcasted: Option<AsyncCallback<String>>,
) -> Self {
Self {
storage,
services,
trigger_msecs: ONE_SECOND * 8,
aged_msecs: ONE_SECOND * 7,
sending_msecs: ONE_MINUTE * 5,
last_run_msecs: 0,
last_sending_run_msecs: None,
include_sending: true,
on_tx_broadcasted,
_chain: chain,
}
}
pub fn with_intervals(
storage: Arc<WalletStorageManager>,
services: Arc<dyn WalletServices>,
chain: Chain,
trigger_msecs: u64,
aged_msecs: u64,
sending_msecs: u64,
on_tx_broadcasted: Option<AsyncCallback<String>>,
) -> Self {
Self {
storage,
services,
trigger_msecs,
aged_msecs,
sending_msecs,
last_run_msecs: 0,
last_sending_run_msecs: None,
include_sending: true,
on_tx_broadcasted,
_chain: chain,
}
}
}
#[async_trait]
impl WalletMonitorTask for TaskSendWaiting {
fn storage_manager(&self) -> Option<&WalletStorageManager> {
Some(&self.storage)
}
fn name(&self) -> &str {
"SendWaiting"
}
fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
self.include_sending = match self.last_sending_run_msecs {
None => true,
Some(last) => now_msecs_since_epoch > last + self.sending_msecs,
};
if self.include_sending {
self.last_sending_run_msecs = Some(now_msecs_since_epoch);
}
if now_msecs_since_epoch > self.last_run_msecs + self.trigger_msecs {
self.last_run_msecs = now_msecs_since_epoch;
true
} else {
false
}
}
async fn run_task(&mut self) -> Result<String, WalletError> {
let mut log = String::new();
let limit = 100i64;
let mut offset = 0i64;
let aged_limit =
chrono::Utc::now().naive_utc() - chrono::Duration::milliseconds(self.aged_msecs as i64);
let statuses = if self.include_sending {
vec![ProvenTxReqStatus::Unsent, ProvenTxReqStatus::Sending]
} else {
vec![ProvenTxReqStatus::Unsent]
};
loop {
let args = FindProvenTxReqsArgs {
partial: ProvenTxReqPartial::default(),
statuses: Some(statuses.clone()),
paged: Some(Paged { limit, offset }),
since: None,
};
let reqs = self.storage.find_proven_tx_reqs(&args).await?;
let count = reqs.len();
if reqs.is_empty() {
break;
}
log.push_str(&format!(
"{} reqs with status {}\n",
reqs.len(),
statuses
.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join(" or ")
));
let aged_reqs: Vec<_> = reqs
.into_iter()
.filter(|req| req.updated_at < aged_limit)
.collect();
log.push_str(&format!(
" Of those, {} aged past {} ms.\n",
aged_reqs.len(),
self.aged_msecs
));
if !aged_reqs.is_empty() {
let post_result = attempt_to_post_reqs_to_network(
&self.storage,
self.services.as_ref(),
&aged_reqs,
)
.await?;
log.push_str(&post_result.log);
for detail in &post_result.details {
if detail.cascade_update_failed {
log.push_str(&format!(
" WARN txid {} req {} cascade tx-status update \
failed; outputs may be temporarily hidden until \
next successful write\n",
detail.txid, detail.req_id
));
}
}
if let Some(ref cb) = self.on_tx_broadcasted {
for detail in &post_result.details {
if detail.status == crate::monitor::helpers::PostReqStatus::Success {
cb(detail.txid.clone()).await;
}
}
}
}
if (count as i64) < limit {
break;
}
offset += limit;
}
Ok(log)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_intervals() {
assert_eq!(ONE_SECOND * 8, 8000);
assert_eq!(ONE_SECOND * 7, 7000);
assert_eq!(ONE_MINUTE * 5, 300_000);
}
#[test]
fn test_task_name() {
assert_eq!("SendWaiting", "SendWaiting");
}
}