1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4 time::Duration,
5};
6
7use anyhow::anyhow;
8use chrono::{Local, TimeZone};
9use clap::{Args, Subcommand};
10use clock::SYSVAR_CLOCK;
11use futures::stream::StreamExt;
12use itertools::Itertools;
13use serde::Serialize;
14use solana_client::{
15 rpc_client::GetConfirmedSignaturesForAddress2Config,
16 rpc_config::{RpcSimulateTransactionConfig, RpcTransactionConfig},
17};
18use solana_sdk::{
19 commitment_config::CommitmentLevel,
20 message::{v0, VersionedMessage},
21 pubkey::Pubkey,
22 signer::Signer,
23 transaction::VersionedTransaction,
24};
25use solana_transaction_status_client_types::UiTransactionEncoding;
26use solana_transaction_utils::{
27 pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
28};
29use tuktuk_program::{
30 types::{QueueTaskArgsV0, TriggerV0},
31 TaskQueueV0, TaskV0,
32};
33use tuktuk_sdk::prelude::*;
34
35use super::{task_queue::TaskQueueArg, TransactionSource};
36use crate::{
37 client::{send_instructions, CliClient},
38 cmd::Opts,
39 result::Result,
40 serde::{print_json, serde_pubkey},
41};
42
43#[derive(Debug, Args)]
44pub struct TaskCmd {
45 #[arg(long, default_value = "false")]
46 pub verbose: bool,
47 #[command(subcommand)]
48 pub cmd: Cmd,
49}
50
51#[derive(Debug, Subcommand)]
52pub enum Cmd {
53 List {
54 #[command(flatten)]
55 task_queue: TaskQueueArg,
56 #[arg(long)]
58 description: Option<String>,
59 #[arg(long, default_value = "false")]
60 skip_simulate: bool,
61 #[arg(
62 long,
63 help = "Only show tasks that could be executed now",
64 default_value = "false"
65 )]
66 active: bool,
67 #[arg(long, help = "Show tasks with a succesful/failed simulation")]
68 successful: Option<bool>,
69 #[arg(long, help = "Limit the number of tasks returned")]
70 limit: Option<u32>,
71 },
72 Run {
73 #[command(flatten)]
74 task_queue: TaskQueueArg,
75 #[arg(short, long)]
76 id: Option<u16>,
77 #[arg(long)]
79 description: Option<String>,
80 #[arg(short, long, default_value = "false")]
81 skip_preflight: bool,
82 },
83 Requeue {
84 #[command(flatten)]
85 task_queue: TaskQueueArg,
86 #[arg(short, long)]
87 id: Option<u16>,
88 #[arg(short, long, default_value = "false", help = "Requeue all stale tasks")]
89 stale: bool,
90 #[arg(long)]
91 description: Option<String>,
92 #[arg(long)]
93 after_id: Option<u16>,
94 #[arg(long)]
95 new_timestamp: Option<i64>,
96 },
97 Close {
98 #[command(flatten)]
99 task_queue: TaskQueueArg,
100 #[arg(short, long)]
101 id: Option<u16>,
102 #[arg(long)]
104 description: Option<String>,
105 #[arg(
106 long,
107 default_value = "false",
108 help = "Close tasks that fail simulation"
109 )]
110 failed: bool,
111 },
112 Watch {
113 #[command(flatten)]
114 task_queue: TaskQueueArg,
115 #[arg(
116 long,
117 help = "Description prefix to watch for (can be specified multiple times)"
118 )]
119 description: Vec<String>,
120 },
121}
122
123async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
124 let run_ix_res = tuktuk_sdk::compiled_transaction::run_ix(
126 client.as_ref(),
127 task_key,
128 client.payer.pubkey(),
129 &HashSet::new(),
130 )
131 .await;
132
133 match run_ix_res {
134 Ok(run_ix) => {
135 let mut updated_instructions = vec![
137 solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(
138 1900000,
139 ),
140 ];
141 updated_instructions.extend(run_ix.instructions.clone());
142 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
143 let message = VersionedMessage::V0(v0::Message::try_compile(
144 &client.payer.pubkey(),
145 &updated_instructions,
146 &run_ix.lookup_tables,
147 recent_blockhash,
148 )?);
149 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
150 let sim_result = client
151 .rpc_client
152 .simulate_transaction_with_config(
153 &tx,
154 RpcSimulateTransactionConfig {
155 commitment: Some(
156 solana_sdk::commitment_config::CommitmentConfig::confirmed(),
157 ),
158 sig_verify: true,
159 ..Default::default()
160 },
161 )
162 .await;
163
164 match sim_result {
165 Ok(simulated) => Ok(Some(SimulationResult {
166 error: simulated.value.err.map(|e| e.to_string()),
167 logs: Some(simulated.value.logs.unwrap_or_default()),
168 compute_units: simulated.value.units_consumed,
169 })),
170 Err(err) => Ok(Some(SimulationResult {
171 error: Some(err.to_string()),
172 logs: None,
173 compute_units: None,
174 })),
175 }
176 }
177 Err(tuktuk_sdk::error::Error::AccountNotFound) => Ok(None),
178 Err(e) => Ok(Some(SimulationResult {
179 error: Some(e.to_string()),
180 logs: None,
181 compute_units: None,
182 })),
183 }
184}
185
186#[derive(Clone, Serialize)]
187struct SimulationResult {
188 pub error: Option<String>,
189 pub logs: Option<Vec<String>>,
190 pub compute_units: Option<u64>,
191}
192
193async fn handle_task_completion(client: &CliClient, task_key: Pubkey, task_id: u16) -> Result {
194 println!(
195 "Task {} completed! Getting transaction signature...",
196 task_id
197 );
198
199 let signatures = client
201 .rpc_client
202 .get_signatures_for_address_with_config(
203 &task_key,
204 GetConfirmedSignaturesForAddress2Config {
205 limit: Some(10),
206 ..Default::default()
207 },
208 )
209 .await?;
210
211 if signatures.is_empty() {
212 println!("No transaction signature found for task {}", task_id);
213 return Ok(());
214 }
215
216 let recent_signatures: Vec<solana_sdk::signature::Signature> = signatures
218 .iter()
219 .take(10)
220 .map(|sig_info| sig_info.signature.parse().unwrap())
221 .collect();
222
223 let signature_statuses = client
225 .rpc_client
226 .get_signature_statuses_with_history(&recent_signatures)
227 .await?;
228
229 let mut successful_signature = None;
231 for (i, status_result) in signature_statuses.value.iter().enumerate() {
232 match status_result {
233 Some(status) => {
234 if status.err.is_none() {
236 successful_signature = Some(recent_signatures[i].to_string());
237 break;
238 }
239 }
240 None => {
241 continue;
243 }
244 }
245 }
246
247 if let Some(signature) = successful_signature {
248 println!("Successful transaction signature: {}", signature);
249
250 match client
252 .rpc_client
253 .get_transaction_with_config(
254 &signature.parse()?,
255 RpcTransactionConfig {
256 encoding: Some(UiTransactionEncoding::Json),
257 max_supported_transaction_version: Some(0),
258 ..Default::default()
259 },
260 )
261 .await
262 {
263 Ok(tx) => {
264 if let Some(meta) = tx.transaction.meta {
265 match meta.log_messages {
266 solana_transaction_status_client_types::option_serializer::OptionSerializer::Some(logs) => {
267 println!("Transaction logs:");
268 for log in logs {
269 println!(" {}", log);
270 }
271 }
272 _ => {
273 println!("No logs found in transaction");
274 }
275 }
276 } else {
277 println!("No transaction metadata found");
278 }
279 }
280 Err(e) => {
281 println!("Error getting transaction details: {}", e);
282 }
283 }
284 } else {
285 println!(
286 "No successful transaction found for task {} (all {} recent transactions failed)",
287 task_id,
288 recent_signatures.len()
289 );
290 }
291
292 Ok(())
293}
294
295impl TaskCmd {
296 pub async fn run(&self, opts: Opts) -> Result {
297 match &self.cmd {
298 Cmd::List {
299 task_queue,
300 description,
301 skip_simulate,
302 active,
303 limit,
304 successful,
305 } => {
306 let client = opts.client().await?;
307 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
308
309 let task_queue: TaskQueueV0 = client
310 .as_ref()
311 .anchor_account(&task_queue_pubkey)
312 .await?
313 .ok_or_else(|| anyhow!("Topic account not found"))?;
314 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
315 let tasks = client
316 .as_ref()
317 .anchor_accounts::<TaskV0>(&task_keys)
318 .await?;
319
320 let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
321 let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
322 let now = clock.unix_timestamp;
323
324 let filtered_tasks = tasks
325 .into_iter()
326 .filter(|(_, task)| {
327 if let Some(task) = task {
328 if let Some(description) = description {
329 if !task.description.starts_with(description) {
330 return false;
331 }
332 }
333
334 return !*active || task.trigger.is_active(now);
337 }
338 false
339 })
340 .collect::<Vec<_>>();
341
342 let mut json_tasks = Vec::with_capacity(filtered_tasks.len());
343 let mut simulation_tasks = Vec::new();
344
345 for (i, (pubkey, maybe_task)) in filtered_tasks.into_iter().enumerate() {
346 if let Some(task) = maybe_task {
347 if !*skip_simulate && task.trigger.is_active(now) {
348 simulation_tasks.push((i, pubkey));
349 }
350
351 json_tasks.push((
352 i,
353 Task {
354 pubkey,
355 id: task.id,
356 description: task.description,
357 trigger: Trigger::from(task.trigger),
358 crank_reward: task.crank_reward,
359 rent_refund: task.rent_refund,
360 simulation_result: None,
361 transaction: if self.verbose {
362 Some(TransactionSource::from(task.transaction.clone()))
363 } else {
364 None
365 },
366 },
367 ));
368
369 if let Some(limit) = limit {
370 if json_tasks.len() >= *limit as usize {
371 break;
372 }
373 }
374 }
375 }
376
377 let client = Arc::new(client);
379 let simulation_results = futures::stream::iter(simulation_tasks)
380 .map(|(i, pubkey)| {
381 let client = client.clone();
382 async move {
383 let result = simulate_task(&client, pubkey).await;
384 (i, result)
385 }
386 })
387 .buffer_unordered(10)
388 .collect::<Vec<_>>()
389 .await;
390
391 let mut results = vec![None; json_tasks.len()];
392 for (i, result) in simulation_results {
393 if let Ok(sim_result) = result {
394 results[i] = sim_result;
395 }
396 }
397
398 for (i, task) in json_tasks.iter_mut() {
400 task.simulation_result = results[*i].clone();
401 }
402
403 let mut final_tasks = json_tasks
405 .into_iter()
406 .map(|(_, task)| task)
407 .collect::<Vec<_>>();
408 if let Some(successful) = successful {
409 final_tasks.retain(|task| {
410 if let Some(simulation_result) = &task.simulation_result {
411 (*successful && simulation_result.error.is_none())
412 || (!*successful && simulation_result.error.is_some())
413 } else {
414 !*successful
415 }
416 });
417 }
418
419 print_json(&final_tasks)?;
420 }
421 Cmd::Close {
422 task_queue,
423 id: index,
424 description,
425 failed,
426 } => {
427 if index.is_none() && description.is_none() {
428 return Err(anyhow!("Either id or description must be provided"));
429 }
430 if index.is_some() && description.is_some() {
431 return Err(anyhow!("Only one of id or description can be provided"));
432 }
433 let client = opts.client().await?;
434 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
435 let task_queue: TaskQueueV0 = client
436 .as_ref()
437 .anchor_account(&task_queue_pubkey)
438 .await?
439 .ok_or_else(|| anyhow!("Task queue account not found"))?;
440 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
441 let tasks = if let Some(index) = index {
442 let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
443 let task = client
444 .as_ref()
445 .anchor_account::<TaskV0>(&task_key)
446 .await?
447 .ok_or_else(|| anyhow!("Task not found"))?;
448 vec![(task_key, task)]
449 } else if let Some(description) = description {
450 let tasks = client
451 .as_ref()
452 .anchor_accounts::<TaskV0>(&task_keys)
453 .await?;
454 let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
455 let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
456 let now = clock.unix_timestamp;
457 tasks
458 .into_iter()
459 .filter(|(_, task)| {
460 if let Some(task) = task {
461 if *failed && !task.trigger.is_active(now) {
462 return false;
463 }
464 return task.description.starts_with(description);
465 }
466
467 false
468 })
469 .map(|(p, task)| (p, task.unwrap().clone()))
470 .collect()
471 } else {
472 vec![]
473 };
474
475 let mut seen_ids = HashSet::new();
476 let mut to_close = Vec::new();
477
478 let client = Arc::new(client);
480 let simulation_tasks = tasks
481 .iter()
482 .filter(|(_, task)| seen_ids.insert(task.id))
483 .map(|(pubkey, _)| *pubkey)
484 .collect::<Vec<_>>();
485
486 let mut simulation_results = HashMap::new();
487 if *failed {
488 let results = futures::stream::iter(simulation_tasks)
490 .map(|pubkey| {
491 let client = client.clone();
492 async move {
493 let result = simulate_task(&client, pubkey).await;
494 (pubkey, result)
495 }
496 })
497 .buffer_unordered(10)
498 .collect::<Vec<_>>()
499 .await;
500
501 simulation_results = results.into_iter().collect();
503 }
504
505 for (pubkey, task) in &tasks {
507 if seen_ids.contains(&task.id) {
508 if *failed {
509 if let Some(Ok(Some(sim_result))) = simulation_results.get(pubkey) {
510 if sim_result.error.is_some() {
511 to_close.push(task.clone());
512 }
513 }
514 } else {
515 to_close.push(task.clone());
516 }
517 }
518 }
519
520 let ixs = to_close
521 .into_iter()
522 .map(|task| {
523 tuktuk::task::dequeue_ix(
524 task_queue_pubkey,
525 client.payer.pubkey(),
526 task.rent_refund,
527 task.id,
528 )
529 .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
530 })
531 .collect::<Result<Vec<_>>>()?;
532
533 let ix_groups = ixs.into_iter().map(|ix| vec![ix]).collect_vec();
534 let groups = pack_instructions_into_transactions(
535 &ix_groups.iter().map(|ix| ix.as_slice()).collect_vec(),
536 None,
537 )?;
538
539 for mut to_send in groups {
540 to_send.instructions.remove(0);
542 to_send.instructions.remove(0);
543 send_instructions(
544 client.rpc_client.clone(),
545 &client.payer,
546 client.opts.ws_url().as_str(),
547 &to_send.instructions,
548 &[],
549 )
550 .await?;
551 }
552 }
553 Cmd::Run {
554 task_queue,
555 id,
556 skip_preflight,
557 description,
558 } => {
559 if id.is_none() && description.is_none() {
560 return Err(anyhow!("Either id or description must be provided"));
561 }
562 if id.is_some() && description.is_some() {
563 return Err(anyhow!("Only one of id or description can be provided"));
564 }
565 let client = opts.client().await?;
566 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
567 let task_queue: TaskQueueV0 = client
568 .as_ref()
569 .anchor_account(&task_queue_pubkey)
570 .await?
571 .ok_or_else(|| anyhow!("Task queue account not found"))?;
572 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
573 let tasks = if let Some(id) = id {
574 let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
575 let task = client
576 .as_ref()
577 .anchor_account::<TaskV0>(&task_key)
578 .await?
579 .ok_or_else(|| anyhow!("Task not found"))?;
580 vec![(task_key, task)]
581 } else if let Some(description) = description {
582 let tasks = client
583 .as_ref()
584 .anchor_accounts::<TaskV0>(&task_keys)
585 .await?;
586 tasks
587 .into_iter()
588 .filter(|(_, task)| {
589 if let Some(task) = task {
590 return task.description.starts_with(description);
591 }
592 false
593 })
594 .map(|(p, task)| (p, task.unwrap().clone()))
595 .collect()
596 } else {
597 vec![]
598 };
599 for (task_key, _) in tasks {
600 let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
601 client.as_ref(),
602 task_key,
603 client.payer.pubkey(),
604 &HashSet::new(),
605 )
606 .await;
607 match run_ix_result {
608 Ok(run_ix) => {
609 let blockhash = client.rpc_client.get_latest_blockhash().await?;
610 let (computed, _) = auto_compute_limit_and_price(
611 &client.rpc_client,
612 &run_ix.instructions,
613 1.2,
614 &client.payer.pubkey(),
615 Some(blockhash),
616 Some(run_ix.lookup_tables.clone()),
617 )
618 .await
619 .unwrap();
620
621 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
622 let message = VersionedMessage::V0(v0::Message::try_compile(
623 &client.payer.pubkey(),
624 &computed,
625 &run_ix.lookup_tables,
626 recent_blockhash,
627 )?);
628 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
629 let txid = client
630 .rpc_client
631 .send_transaction_with_config(
632 &tx,
633 solana_client::rpc_config::RpcSendTransactionConfig {
634 skip_preflight: *skip_preflight,
635 preflight_commitment: Some(CommitmentLevel::Confirmed),
636 ..Default::default()
637 },
638 )
639 .await?;
640
641 println!("Tx sent: {txid}");
642 }
643 Err(e) => {
644 println!("Error running task: {e:?}");
645 }
646 }
647 }
648 }
649 Cmd::Requeue {
650 task_queue,
651 id,
652 new_timestamp,
653 stale,
654 description,
655 after_id,
656 } => {
657 let client = opts.client().await?;
658 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
659 let task_queue: TaskQueueV0 = client
660 .as_ref()
661 .anchor_account(&task_queue_pubkey)
662 .await?
663 .ok_or_else(|| anyhow!("Topic account not found"))?;
664 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
665 let tasks = client
666 .as_ref()
667 .anchor_accounts::<TaskV0>(&task_keys)
668 .await?;
669
670 let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
671 let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
672 let now = clock.unix_timestamp;
673
674 let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
675 if let Some(task) = task {
676 if *stale {
677 let is_stale = task.trigger.is_active(now)
678 && match task.trigger {
679 TriggerV0::Now => false,
680 TriggerV0::Timestamp(ts) => {
681 now - ts > task_queue.stale_task_age as i64
682 }
683 };
684
685 if !is_stale {
686 return false;
687 }
688 }
689
690 if let Some(description) = description {
691 if !task.description.starts_with(description) {
692 return false;
693 }
694 }
695
696 if let Some(after_id) = after_id {
697 if task.id <= *after_id {
698 return false;
699 }
700 }
701
702 if let Some(id) = id {
703 if task.id != *id {
704 return false;
705 }
706 }
707
708 return true;
709 }
710 false
711 });
712
713 let collected_tasks = filtered_tasks
714 .into_iter()
715 .flat_map(|(_, task)| task)
716 .collect_vec();
717
718 println!("Requeueing {} tasks", collected_tasks.len());
719
720 for task in collected_tasks {
721 let (new_task_key, ix) = tuktuk::task::queue(
722 client.as_ref(),
723 client.payer.pubkey(),
724 client.payer.pubkey(),
725 task_queue_pubkey,
726 QueueTaskArgsV0 {
727 id: task.id,
728 trigger: new_timestamp.map_or(TriggerV0::Now, TriggerV0::Timestamp),
729 transaction: task.transaction.clone(),
730 crank_reward: Some(task.crank_reward),
731 free_tasks: task.free_tasks,
732 description: task.description,
733 },
734 )
735 .await?;
736
737 send_instructions(
738 client.rpc_client.clone(),
739 &client.payer,
740 client.opts.ws_url().as_str(),
741 &[ix],
742 &[],
743 )
744 .await?;
745
746 println!("New task key: {new_task_key}");
747 }
748 }
749 Cmd::Watch {
750 task_queue,
751 description,
752 } => {
753 if description.is_empty() {
754 return Err(anyhow!(
755 "At least one description must be provided for watch command"
756 ));
757 }
758
759 let client = opts.client().await?;
760 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
761 let task_queue: TaskQueueV0 = client
762 .as_ref()
763 .anchor_account(&task_queue_pubkey)
764 .await?
765 .ok_or_else(|| anyhow!("Task queue account not found"))?;
766
767 let trimmed_descriptions: Vec<String> = description
768 .iter()
769 .map(|prefix| {
770 if prefix.len() > 40 {
771 prefix.chars().take(40).collect()
772 } else {
773 prefix.clone()
774 }
775 })
776 .collect();
777
778 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
780 let existing_tasks = client
781 .as_ref()
782 .anchor_accounts::<TaskV0>(&task_keys)
783 .await?;
784
785 let mut watched_tasks = std::collections::HashMap::new();
786
787 for (task_key, maybe_task) in existing_tasks {
789 if let Some(task) = maybe_task {
790 let matches = trimmed_descriptions
792 .iter()
793 .any(|prefix| task.description.starts_with(prefix));
794 if matches {
795 println!(
796 "Found existing matching task: {} (ID: {}, KEY: {})",
797 task.description, task.id, task_key
798 );
799 watched_tasks.insert(task_key, task.id);
800 }
801 }
802 }
803
804 let (pubsub_client_raw, _pubsub_handle, _shutdown_sender) =
806 tuktuk_sdk::pubsub_client::PubsubClient::new(client.opts.ws_url().as_str())
807 .await?;
808 let pubsub_client = Arc::new(pubsub_client_raw);
809 let pubsub_tracker = Arc::new(tuktuk_sdk::watcher::PubsubTracker::new(
810 client.rpc_client.clone(),
811 pubsub_client,
812 Duration::from_secs(30),
813 solana_sdk::commitment_config::CommitmentConfig::confirmed(),
814 ));
815
816 let (stream, _unsub) = tuktuk::task::on_new(
818 client.as_ref(),
819 &pubsub_tracker,
820 &task_queue_pubkey,
821 &task_queue,
822 )
823 .await?;
824 println!(
825 "Watching for tasks with description prefixes: {:?}",
826 trimmed_descriptions
827 );
828 println!("Press Ctrl+C to stop watching...");
829
830 let mut stream = Box::pin(stream);
831
832 while let Some(update) = stream.next().await {
833 match update {
834 Ok(task_update) => {
835 for (task_key, maybe_task) in task_update.tasks {
837 if let Some(task) = maybe_task {
838 let matches = trimmed_descriptions
840 .iter()
841 .any(|prefix| task.description.starts_with(prefix));
842 if matches {
843 println!(
844 "Found matching task: {} (ID: {}, KEY: {})",
845 task.description, task.id, task_key
846 );
847 watched_tasks.insert(task_key, task.id);
848 }
849 } else {
850 if let Some(task_id) = watched_tasks.remove(&task_key) {
852 if let Err(e) =
853 handle_task_completion(&client, task_key, task_id).await
854 {
855 eprintln!("Error handling task completion: {}", e);
856 }
857 }
858 }
859 }
860
861 for removed_task_key in task_update.removed {
863 if let Some(task_id) = watched_tasks.remove(&removed_task_key) {
864 if let Err(e) =
865 handle_task_completion(&client, removed_task_key, task_id)
866 .await
867 {
868 eprintln!("Error handling task completion: {}", e);
869 }
870 }
871 }
872 }
873 Err(e) => {
874 eprintln!("Error receiving task update: {}", e);
875 }
876 }
877 }
878 }
879 }
880 Ok(())
881 }
882}
883
884#[derive(Serialize)]
885struct Task {
886 #[serde(with = "serde_pubkey")]
887 pub pubkey: Pubkey,
888 pub id: u16,
889 pub description: String,
890 #[serde(with = "serde_pubkey")]
891 pub rent_refund: Pubkey,
892 pub trigger: Trigger,
893 pub crank_reward: u64,
894 pub simulation_result: Option<SimulationResult>,
895 pub transaction: Option<TransactionSource>,
896}
897
898#[derive(Serialize)]
899enum Trigger {
900 Now,
901 Timestamp {
902 epoch: i64,
903 #[serde(rename = "human_readable")]
904 formatted: String,
905 },
906}
907
908impl From<TriggerV0> for Trigger {
909 fn from(trigger: TriggerV0) -> Self {
910 match trigger {
911 TriggerV0::Now => Trigger::Now,
912 TriggerV0::Timestamp(ts) => Trigger::Timestamp {
913 epoch: ts,
914 formatted: Local
915 .timestamp_opt(ts, 0)
916 .single()
917 .unwrap_or_else(Local::now)
918 .to_rfc3339(),
919 },
920 }
921 }
922}