1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4 time::Duration,
5};
6
7use anyhow::anyhow;
8use chrono::{Local, TimeZone};
9use clap::{Args, Subcommand};
10use clock::SYSVAR_CLOCK;
11use futures::stream::StreamExt;
12use itertools::Itertools;
13use serde::Serialize;
14use solana_client::{
15 rpc_client::GetConfirmedSignaturesForAddress2Config,
16 rpc_config::{RpcSimulateTransactionConfig, RpcTransactionConfig},
17};
18use solana_sdk::{
19 commitment_config::CommitmentLevel,
20 message::{v0, VersionedMessage},
21 pubkey::Pubkey,
22 signer::Signer,
23 transaction::VersionedTransaction,
24};
25use solana_transaction_status_client_types::UiTransactionEncoding;
26use solana_transaction_utils::{
27 pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
28};
29use tuktuk_program::{
30 types::{QueueTaskArgsV0, TriggerV0},
31 TaskQueueV0, TaskV0,
32};
33use tuktuk_sdk::prelude::*;
34
35use super::{task_queue::TaskQueueArg, TransactionSource};
36use crate::{
37 client::{send_instructions, CliClient},
38 cmd::Opts,
39 result::Result,
40 serde::{print_json, serde_pubkey},
41};
42
43#[derive(Debug, Args)]
44pub struct TaskCmd {
45 #[arg(long, default_value = "false")]
46 pub verbose: bool,
47 #[command(subcommand)]
48 pub cmd: Cmd,
49}
50
51#[derive(Debug, Subcommand)]
52pub enum Cmd {
53 List {
54 #[command(flatten)]
55 task_queue: TaskQueueArg,
56 #[arg(long)]
58 description: Option<String>,
59 #[arg(long, default_value = "false")]
60 skip_simulate: bool,
61 #[arg(
62 long,
63 help = "Only show tasks that could be executed now",
64 default_value = "false"
65 )]
66 active: bool,
67 #[arg(long, help = "Show tasks with a succesful/failed simulation")]
68 successful: Option<bool>,
69 #[arg(long, help = "Limit the number of tasks returned")]
70 limit: Option<u32>,
71 },
72 Run {
73 #[command(flatten)]
74 task_queue: TaskQueueArg,
75 #[arg(short, long)]
76 id: Option<u16>,
77 #[arg(long)]
79 description: Option<String>,
80 #[arg(short, long, default_value = "false")]
81 skip_preflight: bool,
82 },
83 Requeue {
84 #[command(flatten)]
85 task_queue: TaskQueueArg,
86 #[arg(short, long)]
87 id: Option<u16>,
88 #[arg(short, long, default_value = "false", help = "Requeue all stale tasks")]
89 stale: bool,
90 #[arg(long)]
91 description: Option<String>,
92 #[arg(long)]
93 after_id: Option<u16>,
94 #[arg(long)]
95 new_timestamp: Option<i64>,
96 },
97 Close {
98 #[command(flatten)]
99 task_queue: TaskQueueArg,
100 #[arg(short, long)]
101 id: Option<u16>,
102 #[arg(long)]
104 description: Option<String>,
105 #[arg(
106 long,
107 default_value = "false",
108 help = "Close tasks that fail simulation"
109 )]
110 failed: bool,
111 },
112 Watch {
113 #[command(flatten)]
114 task_queue: TaskQueueArg,
115 #[arg(
116 long,
117 help = "Description prefix to watch for (can be specified multiple times)"
118 )]
119 description: Vec<String>,
120 },
121}
122
123async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
124 let run_ix_res = tuktuk_sdk::compiled_transaction::run_ix(
126 client.as_ref(),
127 client.as_ref(),
128 task_key,
129 client.payer.pubkey(),
130 &HashSet::new(),
131 )
132 .await;
133
134 match run_ix_res {
135 Ok(run_ix) => {
136 let mut updated_instructions = vec![
138 solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(
139 1900000,
140 ),
141 ];
142 updated_instructions.extend(run_ix.instructions.clone());
143 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
144 let message = VersionedMessage::V0(v0::Message::try_compile(
145 &client.payer.pubkey(),
146 &updated_instructions,
147 &run_ix.lookup_tables,
148 recent_blockhash,
149 )?);
150 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
151 let sim_result = client
152 .rpc_client
153 .simulate_transaction_with_config(
154 &tx,
155 RpcSimulateTransactionConfig {
156 commitment: Some(
157 solana_sdk::commitment_config::CommitmentConfig::confirmed(),
158 ),
159 sig_verify: true,
160 ..Default::default()
161 },
162 )
163 .await;
164
165 match sim_result {
166 Ok(simulated) => Ok(Some(SimulationResult {
167 error: simulated.value.err.map(|e| e.to_string()),
168 logs: Some(simulated.value.logs.unwrap_or_default()),
169 compute_units: simulated.value.units_consumed,
170 })),
171 Err(err) => Ok(Some(SimulationResult {
172 error: Some(err.to_string()),
173 logs: None,
174 compute_units: None,
175 })),
176 }
177 }
178 Err(tuktuk_sdk::error::Error::AccountNotFound) => Ok(None),
179 Err(e) => Ok(Some(SimulationResult {
180 error: Some(e.to_string()),
181 logs: None,
182 compute_units: None,
183 })),
184 }
185}
186
187#[derive(Clone, Serialize)]
188struct SimulationResult {
189 pub error: Option<String>,
190 pub logs: Option<Vec<String>>,
191 pub compute_units: Option<u64>,
192}
193
194async fn handle_task_completion(client: &CliClient, task_key: Pubkey, task_id: u16) -> Result {
195 println!(
196 "Task {} completed! Getting transaction signature...",
197 task_id
198 );
199
200 let signatures = client
202 .rpc_client
203 .get_signatures_for_address_with_config(
204 &task_key,
205 GetConfirmedSignaturesForAddress2Config {
206 limit: Some(10),
207 ..Default::default()
208 },
209 )
210 .await?;
211
212 if signatures.is_empty() {
213 println!("No transaction signature found for task {}", task_id);
214 return Ok(());
215 }
216
217 let recent_signatures: Vec<solana_sdk::signature::Signature> = signatures
219 .iter()
220 .take(10)
221 .map(|sig_info| sig_info.signature.parse().unwrap())
222 .collect();
223
224 let signature_statuses = client
226 .rpc_client
227 .get_signature_statuses_with_history(&recent_signatures)
228 .await?;
229
230 let mut successful_signature = None;
232 for (i, status_result) in signature_statuses.value.iter().enumerate() {
233 match status_result {
234 Some(status) => {
235 if status.err.is_none() {
237 successful_signature = Some(recent_signatures[i].to_string());
238 break;
239 }
240 }
241 None => {
242 continue;
244 }
245 }
246 }
247
248 if let Some(signature) = successful_signature {
249 println!("Successful transaction signature: {}", signature);
250
251 match client
253 .rpc_client
254 .get_transaction_with_config(
255 &signature.parse()?,
256 RpcTransactionConfig {
257 encoding: Some(UiTransactionEncoding::Json),
258 max_supported_transaction_version: Some(0),
259 ..Default::default()
260 },
261 )
262 .await
263 {
264 Ok(tx) => {
265 if let Some(meta) = tx.transaction.meta {
266 match meta.log_messages {
267 solana_transaction_status_client_types::option_serializer::OptionSerializer::Some(logs) => {
268 println!("Transaction logs:");
269 for log in logs {
270 println!(" {}", log);
271 }
272 }
273 _ => {
274 println!("No logs found in transaction");
275 }
276 }
277 } else {
278 println!("No transaction metadata found");
279 }
280 }
281 Err(e) => {
282 println!("Error getting transaction details: {}", e);
283 }
284 }
285 } else {
286 println!(
287 "No successful transaction found for task {} (all {} recent transactions failed)",
288 task_id,
289 recent_signatures.len()
290 );
291 }
292
293 Ok(())
294}
295
296impl TaskCmd {
297 pub async fn run(&self, opts: Opts) -> Result {
298 match &self.cmd {
299 Cmd::List {
300 task_queue,
301 description,
302 skip_simulate,
303 active,
304 limit,
305 successful,
306 } => {
307 let client = opts.client().await?;
308 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
309
310 let task_queue: TaskQueueV0 = client
311 .as_ref()
312 .anchor_account(&task_queue_pubkey)
313 .await?
314 .ok_or_else(|| anyhow!("Topic account not found"))?;
315 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
316 let tasks = client
317 .as_ref()
318 .anchor_accounts::<TaskV0>(&task_keys)
319 .await?;
320
321 let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
322 let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
323 let now = clock.unix_timestamp;
324
325 let filtered_tasks = tasks
326 .into_iter()
327 .filter(|(_, task)| {
328 if let Some(task) = task {
329 if let Some(description) = description {
330 if !task.description.starts_with(description) {
331 return false;
332 }
333 }
334
335 return !*active || task.trigger.is_active(now);
338 }
339 false
340 })
341 .collect::<Vec<_>>();
342
343 let mut json_tasks = Vec::with_capacity(filtered_tasks.len());
344 let mut simulation_tasks = Vec::new();
345
346 for (i, (pubkey, maybe_task)) in filtered_tasks.into_iter().enumerate() {
347 if let Some(task) = maybe_task {
348 if !*skip_simulate && task.trigger.is_active(now) {
349 simulation_tasks.push((i, pubkey));
350 }
351
352 json_tasks.push((
353 i,
354 Task {
355 pubkey,
356 id: task.id,
357 description: task.description,
358 trigger: Trigger::from(task.trigger),
359 crank_reward: task.crank_reward,
360 rent_refund: task.rent_refund,
361 simulation_result: None,
362 transaction: if self.verbose {
363 Some(TransactionSource::from(task.transaction.clone()))
364 } else {
365 None
366 },
367 },
368 ));
369
370 if let Some(limit) = limit {
371 if json_tasks.len() >= *limit as usize {
372 break;
373 }
374 }
375 }
376 }
377
378 let client = Arc::new(client);
380 let simulation_results = futures::stream::iter(simulation_tasks)
381 .map(|(i, pubkey)| {
382 let client = client.clone();
383 async move {
384 let result = simulate_task(&client, pubkey).await;
385 (i, result)
386 }
387 })
388 .buffer_unordered(10)
389 .collect::<Vec<_>>()
390 .await;
391
392 let mut results = vec![None; json_tasks.len()];
393 for (i, result) in simulation_results {
394 if let Ok(sim_result) = result {
395 results[i] = sim_result;
396 }
397 }
398
399 for (i, task) in json_tasks.iter_mut() {
401 task.simulation_result = results[*i].clone();
402 }
403
404 let mut final_tasks = json_tasks
406 .into_iter()
407 .map(|(_, task)| task)
408 .collect::<Vec<_>>();
409 if let Some(successful) = successful {
410 final_tasks.retain(|task| {
411 if let Some(simulation_result) = &task.simulation_result {
412 (*successful && simulation_result.error.is_none())
413 || (!*successful && simulation_result.error.is_some())
414 } else {
415 !*successful
416 }
417 });
418 }
419
420 print_json(&final_tasks)?;
421 }
422 Cmd::Close {
423 task_queue,
424 id: index,
425 description,
426 failed,
427 } => {
428 if index.is_none() && description.is_none() {
429 return Err(anyhow!("Either id or description must be provided"));
430 }
431 if index.is_some() && description.is_some() {
432 return Err(anyhow!("Only one of id or description can be provided"));
433 }
434 let client = opts.client().await?;
435 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
436 let task_queue: TaskQueueV0 = client
437 .as_ref()
438 .anchor_account(&task_queue_pubkey)
439 .await?
440 .ok_or_else(|| anyhow!("Task queue account not found"))?;
441 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
442 let tasks = if let Some(index) = index {
443 let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
444 let task = client
445 .as_ref()
446 .anchor_account::<TaskV0>(&task_key)
447 .await?
448 .ok_or_else(|| anyhow!("Task not found"))?;
449 vec![(task_key, task)]
450 } else if let Some(description) = description {
451 let tasks = client
452 .as_ref()
453 .anchor_accounts::<TaskV0>(&task_keys)
454 .await?;
455 let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
456 let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
457 let now = clock.unix_timestamp;
458 tasks
459 .into_iter()
460 .filter(|(_, task)| {
461 if let Some(task) = task {
462 if *failed && !task.trigger.is_active(now) {
463 return false;
464 }
465 return task.description.starts_with(description);
466 }
467
468 false
469 })
470 .map(|(p, task)| (p, task.unwrap().clone()))
471 .collect()
472 } else {
473 vec![]
474 };
475
476 let mut seen_ids = HashSet::new();
477 let mut to_close = Vec::new();
478
479 let client = Arc::new(client);
481 let simulation_tasks = tasks
482 .iter()
483 .filter(|(_, task)| seen_ids.insert(task.id))
484 .map(|(pubkey, _)| *pubkey)
485 .collect::<Vec<_>>();
486
487 let mut simulation_results = HashMap::new();
488 if *failed {
489 let results = futures::stream::iter(simulation_tasks)
491 .map(|pubkey| {
492 let client = client.clone();
493 async move {
494 let result = simulate_task(&client, pubkey).await;
495 (pubkey, result)
496 }
497 })
498 .buffer_unordered(10)
499 .collect::<Vec<_>>()
500 .await;
501
502 simulation_results = results.into_iter().collect();
504 }
505
506 for (pubkey, task) in &tasks {
508 if seen_ids.contains(&task.id) {
509 if *failed {
510 if let Some(Ok(Some(sim_result))) = simulation_results.get(pubkey) {
511 if sim_result.error.is_some() {
512 to_close.push(task.clone());
513 }
514 }
515 } else {
516 to_close.push(task.clone());
517 }
518 }
519 }
520
521 let ixs = to_close
522 .into_iter()
523 .map(|task| {
524 tuktuk::task::dequeue_ix(
525 task_queue_pubkey,
526 client.payer.pubkey(),
527 task.rent_refund,
528 task.id,
529 )
530 .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
531 })
532 .collect::<Result<Vec<_>>>()?;
533
534 let ix_groups = ixs.into_iter().map(|ix| vec![ix]).collect_vec();
535 let groups = pack_instructions_into_transactions(
536 &ix_groups.iter().map(|ix| ix.as_slice()).collect_vec(),
537 None,
538 )?;
539
540 for mut to_send in groups {
541 to_send.instructions.remove(0);
543 to_send.instructions.remove(0);
544 send_instructions(
545 client.rpc_client.clone(),
546 &client.payer,
547 client.opts.ws_url().as_str(),
548 &to_send.instructions,
549 &[],
550 )
551 .await?;
552 }
553 }
554 Cmd::Run {
555 task_queue,
556 id,
557 skip_preflight,
558 description,
559 } => {
560 if id.is_none() && description.is_none() {
561 return Err(anyhow!("Either id or description must be provided"));
562 }
563 if id.is_some() && description.is_some() {
564 return Err(anyhow!("Only one of id or description can be provided"));
565 }
566 let client = opts.client().await?;
567 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
568 let task_queue: TaskQueueV0 = client
569 .as_ref()
570 .anchor_account(&task_queue_pubkey)
571 .await?
572 .ok_or_else(|| anyhow!("Task queue account not found"))?;
573 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
574 let tasks = if let Some(id) = id {
575 let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
576 let task = client
577 .as_ref()
578 .anchor_account::<TaskV0>(&task_key)
579 .await?
580 .ok_or_else(|| anyhow!("Task not found"))?;
581 vec![(task_key, task)]
582 } else if let Some(description) = description {
583 let tasks = client
584 .as_ref()
585 .anchor_accounts::<TaskV0>(&task_keys)
586 .await?;
587 tasks
588 .into_iter()
589 .filter(|(_, task)| {
590 if let Some(task) = task {
591 return task.description.starts_with(description);
592 }
593 false
594 })
595 .map(|(p, task)| (p, task.unwrap().clone()))
596 .collect()
597 } else {
598 vec![]
599 };
600 for (task_key, _) in tasks {
601 let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
602 client.as_ref(),
603 client.as_ref(),
604 task_key,
605 client.payer.pubkey(),
606 &HashSet::new(),
607 )
608 .await;
609 match run_ix_result {
610 Ok(run_ix) => {
611 let blockhash = client.rpc_client.get_latest_blockhash().await?;
612 let (computed, _) = auto_compute_limit_and_price(
613 &client.rpc_client,
614 &run_ix.instructions,
615 1.2,
616 &client.payer.pubkey(),
617 Some(blockhash),
618 Some(run_ix.lookup_tables.clone()),
619 )
620 .await
621 .unwrap();
622
623 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
624 let message = VersionedMessage::V0(v0::Message::try_compile(
625 &client.payer.pubkey(),
626 &computed,
627 &run_ix.lookup_tables,
628 recent_blockhash,
629 )?);
630 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
631 let txid = client
632 .rpc_client
633 .send_transaction_with_config(
634 &tx,
635 solana_client::rpc_config::RpcSendTransactionConfig {
636 skip_preflight: *skip_preflight,
637 preflight_commitment: Some(CommitmentLevel::Confirmed),
638 ..Default::default()
639 },
640 )
641 .await?;
642
643 println!("Tx sent: {txid}");
644 }
645 Err(e) => {
646 println!("Error running task: {e:?}");
647 }
648 }
649 }
650 }
651 Cmd::Requeue {
652 task_queue,
653 id,
654 new_timestamp,
655 stale,
656 description,
657 after_id,
658 } => {
659 let client = opts.client().await?;
660 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
661 let task_queue: TaskQueueV0 = client
662 .as_ref()
663 .anchor_account(&task_queue_pubkey)
664 .await?
665 .ok_or_else(|| anyhow!("Topic account not found"))?;
666 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
667 let tasks = client
668 .as_ref()
669 .anchor_accounts::<TaskV0>(&task_keys)
670 .await?;
671
672 let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
673 let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
674 let now = clock.unix_timestamp;
675
676 let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
677 if let Some(task) = task {
678 if *stale {
679 let is_stale = task.trigger.is_active(now)
680 && match task.trigger {
681 TriggerV0::Now => false,
682 TriggerV0::Timestamp(ts) => {
683 now - ts > task_queue.stale_task_age as i64
684 }
685 };
686
687 if !is_stale {
688 return false;
689 }
690 }
691
692 if let Some(description) = description {
693 if !task.description.starts_with(description) {
694 return false;
695 }
696 }
697
698 if let Some(after_id) = after_id {
699 if task.id <= *after_id {
700 return false;
701 }
702 }
703
704 if let Some(id) = id {
705 if task.id != *id {
706 return false;
707 }
708 }
709
710 return true;
711 }
712 false
713 });
714
715 let collected_tasks = filtered_tasks
716 .into_iter()
717 .flat_map(|(_, task)| task)
718 .collect_vec();
719
720 println!("Requeueing {} tasks", collected_tasks.len());
721
722 for task in collected_tasks {
723 let (new_task_key, ix) = tuktuk::task::queue(
724 client.as_ref(),
725 client.payer.pubkey(),
726 client.payer.pubkey(),
727 task_queue_pubkey,
728 QueueTaskArgsV0 {
729 id: task.id,
730 trigger: new_timestamp.map_or(TriggerV0::Now, TriggerV0::Timestamp),
731 transaction: task.transaction.clone(),
732 crank_reward: Some(task.crank_reward),
733 free_tasks: task.free_tasks,
734 description: task.description,
735 },
736 )
737 .await?;
738
739 send_instructions(
740 client.rpc_client.clone(),
741 &client.payer,
742 client.opts.ws_url().as_str(),
743 &[ix],
744 &[],
745 )
746 .await?;
747
748 println!("New task key: {new_task_key}");
749 }
750 }
751 Cmd::Watch {
752 task_queue,
753 description,
754 } => {
755 if description.is_empty() {
756 return Err(anyhow!(
757 "At least one description must be provided for watch command"
758 ));
759 }
760
761 let client = opts.client().await?;
762 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
763 let task_queue: TaskQueueV0 = client
764 .as_ref()
765 .anchor_account(&task_queue_pubkey)
766 .await?
767 .ok_or_else(|| anyhow!("Task queue account not found"))?;
768
769 let trimmed_descriptions: Vec<String> = description
770 .iter()
771 .map(|prefix| {
772 if prefix.len() > 40 {
773 prefix.chars().take(40).collect()
774 } else {
775 prefix.clone()
776 }
777 })
778 .collect();
779
780 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
782 let existing_tasks = client
783 .as_ref()
784 .anchor_accounts::<TaskV0>(&task_keys)
785 .await?;
786
787 let mut watched_tasks = std::collections::HashMap::new();
788
789 for (task_key, maybe_task) in existing_tasks {
791 if let Some(task) = maybe_task {
792 let matches = trimmed_descriptions
794 .iter()
795 .any(|prefix| task.description.starts_with(prefix));
796 if matches {
797 println!(
798 "Found existing matching task: {} (ID: {}, KEY: {})",
799 task.description, task.id, task_key
800 );
801 watched_tasks.insert(task_key, task.id);
802 }
803 }
804 }
805
806 let (pubsub_client_raw, _pubsub_handle, _shutdown_sender) =
808 tuktuk_sdk::pubsub_client::PubsubClient::new(client.opts.ws_url().as_str())
809 .await?;
810 let pubsub_client = Arc::new(pubsub_client_raw);
811 let pubsub_tracker = Arc::new(tuktuk_sdk::watcher::PubsubTracker::new(
812 client.rpc_client.clone(),
813 pubsub_client,
814 Duration::from_secs(30),
815 solana_sdk::commitment_config::CommitmentConfig::confirmed(),
816 ));
817
818 let (stream, _unsub) = tuktuk::task::on_new(
820 client.as_ref(),
821 &pubsub_tracker,
822 &task_queue_pubkey,
823 &task_queue,
824 )
825 .await?;
826 println!(
827 "Watching for tasks with description prefixes: {:?}",
828 trimmed_descriptions
829 );
830 println!("Press Ctrl+C to stop watching...");
831
832 let mut stream = Box::pin(stream);
833
834 while let Some(update) = stream.next().await {
835 match update {
836 Ok(task_update) => {
837 for (task_key, maybe_task) in task_update.tasks {
839 if let Some(task) = maybe_task {
840 let matches = trimmed_descriptions
842 .iter()
843 .any(|prefix| task.description.starts_with(prefix));
844 if matches {
845 println!(
846 "Found matching task: {} (ID: {}, KEY: {})",
847 task.description, task.id, task_key
848 );
849 watched_tasks.insert(task_key, task.id);
850 }
851 } else {
852 if let Some(task_id) = watched_tasks.remove(&task_key) {
854 if let Err(e) =
855 handle_task_completion(&client, task_key, task_id).await
856 {
857 eprintln!("Error handling task completion: {}", e);
858 }
859 }
860 }
861 }
862
863 for removed_task_key in task_update.removed {
865 if let Some(task_id) = watched_tasks.remove(&removed_task_key) {
866 if let Err(e) =
867 handle_task_completion(&client, removed_task_key, task_id)
868 .await
869 {
870 eprintln!("Error handling task completion: {}", e);
871 }
872 }
873 }
874 }
875 Err(e) => {
876 eprintln!("Error receiving task update: {}", e);
877 }
878 }
879 }
880 }
881 }
882 Ok(())
883 }
884}
885
886#[derive(Serialize)]
887struct Task {
888 #[serde(with = "serde_pubkey")]
889 pub pubkey: Pubkey,
890 pub id: u16,
891 pub description: String,
892 #[serde(with = "serde_pubkey")]
893 pub rent_refund: Pubkey,
894 pub trigger: Trigger,
895 pub crank_reward: u64,
896 pub simulation_result: Option<SimulationResult>,
897 pub transaction: Option<TransactionSource>,
898}
899
900#[derive(Serialize)]
901enum Trigger {
902 Now,
903 Timestamp {
904 epoch: i64,
905 #[serde(rename = "human_readable")]
906 formatted: String,
907 },
908}
909
910impl From<TriggerV0> for Trigger {
911 fn from(trigger: TriggerV0) -> Self {
912 match trigger {
913 TriggerV0::Now => Trigger::Now,
914 TriggerV0::Timestamp(ts) => Trigger::Timestamp {
915 epoch: ts,
916 formatted: Local
917 .timestamp_opt(ts, 0)
918 .single()
919 .unwrap_or_else(Local::now)
920 .to_rfc3339(),
921 },
922 }
923 }
924}