tuktuk_cli/cmd/
task.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4    time::Duration,
5};
6
7use anyhow::anyhow;
8use chrono::{Local, TimeZone};
9use clap::{Args, Subcommand};
10use clock::SYSVAR_CLOCK;
11use futures::stream::StreamExt;
12use itertools::Itertools;
13use serde::Serialize;
14use solana_client::{
15    rpc_client::GetConfirmedSignaturesForAddress2Config,
16    rpc_config::{RpcSimulateTransactionConfig, RpcTransactionConfig},
17};
18use solana_sdk::{
19    commitment_config::CommitmentLevel,
20    message::{v0, VersionedMessage},
21    pubkey::Pubkey,
22    signer::Signer,
23    transaction::VersionedTransaction,
24};
25use solana_transaction_status_client_types::UiTransactionEncoding;
26use solana_transaction_utils::{
27    pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
28};
29use tuktuk_program::{
30    types::{QueueTaskArgsV0, TriggerV0},
31    TaskQueueV0, TaskV0,
32};
33use tuktuk_sdk::prelude::*;
34
35use super::{task_queue::TaskQueueArg, TransactionSource};
36use crate::{
37    client::{send_instructions, CliClient},
38    cmd::Opts,
39    result::Result,
40    serde::{print_json, serde_pubkey},
41};
42
43#[derive(Debug, Args)]
44pub struct TaskCmd {
45    #[arg(long, default_value = "false")]
46    pub verbose: bool,
47    #[command(subcommand)]
48    pub cmd: Cmd,
49}
50
51#[derive(Debug, Subcommand)]
52pub enum Cmd {
53    List {
54        #[command(flatten)]
55        task_queue: TaskQueueArg,
56        // Description prefix for the task to filter by
57        #[arg(long)]
58        description: Option<String>,
59        #[arg(long, default_value = "false")]
60        skip_simulate: bool,
61        #[arg(
62            long,
63            help = "Only show tasks that could be executed now",
64            default_value = "false"
65        )]
66        active: bool,
67        #[arg(long, help = "Show tasks with a succesful/failed simulation")]
68        successful: Option<bool>,
69        #[arg(long, help = "Limit the number of tasks returned")]
70        limit: Option<u32>,
71    },
72    Run {
73        #[command(flatten)]
74        task_queue: TaskQueueArg,
75        #[arg(short, long)]
76        id: Option<u16>,
77        // Description prefix to run by
78        #[arg(long)]
79        description: Option<String>,
80        #[arg(short, long, default_value = "false")]
81        skip_preflight: bool,
82    },
83    Requeue {
84        #[command(flatten)]
85        task_queue: TaskQueueArg,
86        #[arg(short, long)]
87        id: Option<u16>,
88        #[arg(short, long, default_value = "false", help = "Requeue all stale tasks")]
89        stale: bool,
90        #[arg(long)]
91        description: Option<String>,
92        #[arg(long)]
93        after_id: Option<u16>,
94        #[arg(long)]
95        new_timestamp: Option<i64>,
96    },
97    Close {
98        #[command(flatten)]
99        task_queue: TaskQueueArg,
100        #[arg(short, long)]
101        id: Option<u16>,
102        // Description prefix to close by
103        #[arg(long)]
104        description: Option<String>,
105        #[arg(
106            long,
107            default_value = "false",
108            help = "Close tasks that fail simulation"
109        )]
110        failed: bool,
111    },
112    Watch {
113        #[command(flatten)]
114        task_queue: TaskQueueArg,
115        #[arg(
116            long,
117            help = "Description prefix to watch for (can be specified multiple times)"
118        )]
119        description: Vec<String>,
120    },
121}
122
123async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
124    // Get the run instruction
125    let run_ix_res = tuktuk_sdk::compiled_transaction::run_ix(
126        client.as_ref(),
127        client.as_ref(),
128        task_key,
129        client.payer.pubkey(),
130        &HashSet::new(),
131    )
132    .await;
133
134    match run_ix_res {
135        Ok(run_ix) => {
136            // Create and simulate the transaction
137            let mut updated_instructions = vec![
138                solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(
139                    1900000,
140                ),
141            ];
142            updated_instructions.extend(run_ix.instructions.clone());
143            let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
144            let message = VersionedMessage::V0(v0::Message::try_compile(
145                &client.payer.pubkey(),
146                &updated_instructions,
147                &run_ix.lookup_tables,
148                recent_blockhash,
149            )?);
150            let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
151            let sim_result = client
152                .rpc_client
153                .simulate_transaction_with_config(
154                    &tx,
155                    RpcSimulateTransactionConfig {
156                        commitment: Some(
157                            solana_sdk::commitment_config::CommitmentConfig::confirmed(),
158                        ),
159                        sig_verify: true,
160                        ..Default::default()
161                    },
162                )
163                .await;
164
165            match sim_result {
166                Ok(simulated) => Ok(Some(SimulationResult {
167                    error: simulated.value.err.map(|e| e.to_string()),
168                    logs: Some(simulated.value.logs.unwrap_or_default()),
169                    compute_units: simulated.value.units_consumed,
170                })),
171                Err(err) => Ok(Some(SimulationResult {
172                    error: Some(err.to_string()),
173                    logs: None,
174                    compute_units: None,
175                })),
176            }
177        }
178        Err(tuktuk_sdk::error::Error::AccountNotFound) => Ok(None),
179        Err(e) => Ok(Some(SimulationResult {
180            error: Some(e.to_string()),
181            logs: None,
182            compute_units: None,
183        })),
184    }
185}
186
187#[derive(Clone, Serialize)]
188struct SimulationResult {
189    pub error: Option<String>,
190    pub logs: Option<Vec<String>>,
191    pub compute_units: Option<u64>,
192}
193
194async fn handle_task_completion(client: &CliClient, task_key: Pubkey, task_id: u16) -> Result {
195    println!(
196        "Task {} completed! Getting transaction signature...",
197        task_id
198    );
199
200    // Get the last 10 transaction signatures for this task
201    let signatures = client
202        .rpc_client
203        .get_signatures_for_address_with_config(
204            &task_key,
205            GetConfirmedSignaturesForAddress2Config {
206                limit: Some(10),
207                ..Default::default()
208            },
209        )
210        .await?;
211
212    if signatures.is_empty() {
213        println!("No transaction signature found for task {}", task_id);
214        return Ok(());
215    }
216
217    // Limit to last 10 transactions
218    let recent_signatures: Vec<solana_sdk::signature::Signature> = signatures
219        .iter()
220        .take(10)
221        .map(|sig_info| sig_info.signature.parse().unwrap())
222        .collect();
223
224    // Get statuses for all signatures at once
225    let signature_statuses = client
226        .rpc_client
227        .get_signature_statuses_with_history(&recent_signatures)
228        .await?;
229
230    // Find the first successful transaction
231    let mut successful_signature = None;
232    for (i, status_result) in signature_statuses.value.iter().enumerate() {
233        match status_result {
234            Some(status) => {
235                // Check if the transaction was successful (no error)
236                if status.err.is_none() {
237                    successful_signature = Some(recent_signatures[i].to_string());
238                    break;
239                }
240            }
241            None => {
242                // Transaction not found, continue to next
243                continue;
244            }
245        }
246    }
247
248    if let Some(signature) = successful_signature {
249        println!("Successful transaction signature: {}", signature);
250
251        // Get the full transaction to extract logs
252        match client
253            .rpc_client
254            .get_transaction_with_config(
255                &signature.parse()?,
256                RpcTransactionConfig {
257                    encoding: Some(UiTransactionEncoding::Json),
258                    max_supported_transaction_version: Some(0),
259                    ..Default::default()
260                },
261            )
262            .await
263        {
264            Ok(tx) => {
265                if let Some(meta) = tx.transaction.meta {
266                    match meta.log_messages {
267                        solana_transaction_status_client_types::option_serializer::OptionSerializer::Some(logs) => {
268                            println!("Transaction logs:");
269                            for log in logs {
270                                println!("  {}", log);
271                            }
272                        }
273                        _ => {
274                            println!("No logs found in transaction");
275                        }
276                    }
277                } else {
278                    println!("No transaction metadata found");
279                }
280            }
281            Err(e) => {
282                println!("Error getting transaction details: {}", e);
283            }
284        }
285    } else {
286        println!(
287            "No successful transaction found for task {} (all {} recent transactions failed)",
288            task_id,
289            recent_signatures.len()
290        );
291    }
292
293    Ok(())
294}
295
296impl TaskCmd {
297    pub async fn run(&self, opts: Opts) -> Result {
298        match &self.cmd {
299            Cmd::List {
300                task_queue,
301                description,
302                skip_simulate,
303                active,
304                limit,
305                successful,
306            } => {
307                let client = opts.client().await?;
308                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
309
310                let task_queue: TaskQueueV0 = client
311                    .as_ref()
312                    .anchor_account(&task_queue_pubkey)
313                    .await?
314                    .ok_or_else(|| anyhow!("Topic account not found"))?;
315                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
316                let tasks = client
317                    .as_ref()
318                    .anchor_accounts::<TaskV0>(&task_keys)
319                    .await?;
320
321                let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
322                let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
323                let now = clock.unix_timestamp;
324
325                let filtered_tasks = tasks
326                    .into_iter()
327                    .filter(|(_, task)| {
328                        if let Some(task) = task {
329                            if let Some(description) = description {
330                                if !task.description.starts_with(description) {
331                                    return false;
332                                }
333                            }
334
335                            // If active flag is set, only show tasks that are active
336                            // Otherwise, show all tasks
337                            return !*active || task.trigger.is_active(now);
338                        }
339                        false
340                    })
341                    .collect::<Vec<_>>();
342
343                let mut json_tasks = Vec::with_capacity(filtered_tasks.len());
344                let mut simulation_tasks = Vec::new();
345
346                for (i, (pubkey, maybe_task)) in filtered_tasks.into_iter().enumerate() {
347                    if let Some(task) = maybe_task {
348                        if !*skip_simulate && task.trigger.is_active(now) {
349                            simulation_tasks.push((i, pubkey));
350                        }
351
352                        json_tasks.push((
353                            i,
354                            Task {
355                                pubkey,
356                                id: task.id,
357                                description: task.description,
358                                trigger: Trigger::from(task.trigger),
359                                crank_reward: task.crank_reward,
360                                rent_refund: task.rent_refund,
361                                simulation_result: None,
362                                transaction: if self.verbose {
363                                    Some(TransactionSource::from(task.transaction.clone()))
364                                } else {
365                                    None
366                                },
367                            },
368                        ));
369
370                        if let Some(limit) = limit {
371                            if json_tasks.len() >= *limit as usize {
372                                break;
373                            }
374                        }
375                    }
376                }
377
378                // Run simulations in parallel with a limit of 10 concurrent tasks
379                let client = Arc::new(client);
380                let simulation_results = futures::stream::iter(simulation_tasks)
381                    .map(|(i, pubkey)| {
382                        let client = client.clone();
383                        async move {
384                            let result = simulate_task(&client, pubkey).await;
385                            (i, result)
386                        }
387                    })
388                    .buffer_unordered(10)
389                    .collect::<Vec<_>>()
390                    .await;
391
392                let mut results = vec![None; json_tasks.len()];
393                for (i, result) in simulation_results {
394                    if let Ok(sim_result) = result {
395                        results[i] = sim_result;
396                    }
397                }
398
399                // Update tasks with simulation results
400                for (i, task) in json_tasks.iter_mut() {
401                    task.simulation_result = results[*i].clone();
402                }
403
404                // Filter by simulation success/failure if requested
405                let mut final_tasks = json_tasks
406                    .into_iter()
407                    .map(|(_, task)| task)
408                    .collect::<Vec<_>>();
409                if let Some(successful) = successful {
410                    final_tasks.retain(|task| {
411                        if let Some(simulation_result) = &task.simulation_result {
412                            (*successful && simulation_result.error.is_none())
413                                || (!*successful && simulation_result.error.is_some())
414                        } else {
415                            !*successful
416                        }
417                    });
418                }
419
420                print_json(&final_tasks)?;
421            }
422            Cmd::Close {
423                task_queue,
424                id: index,
425                description,
426                failed,
427            } => {
428                if index.is_none() && description.is_none() {
429                    return Err(anyhow!("Either id or description must be provided"));
430                }
431                if index.is_some() && description.is_some() {
432                    return Err(anyhow!("Only one of id or description can be provided"));
433                }
434                let client = opts.client().await?;
435                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
436                let task_queue: TaskQueueV0 = client
437                    .as_ref()
438                    .anchor_account(&task_queue_pubkey)
439                    .await?
440                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
441                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
442                let tasks = if let Some(index) = index {
443                    let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
444                    let task = client
445                        .as_ref()
446                        .anchor_account::<TaskV0>(&task_key)
447                        .await?
448                        .ok_or_else(|| anyhow!("Task not found"))?;
449                    vec![(task_key, task)]
450                } else if let Some(description) = description {
451                    let tasks = client
452                        .as_ref()
453                        .anchor_accounts::<TaskV0>(&task_keys)
454                        .await?;
455                    let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
456                    let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
457                    let now = clock.unix_timestamp;
458                    tasks
459                        .into_iter()
460                        .filter(|(_, task)| {
461                            if let Some(task) = task {
462                                if *failed && !task.trigger.is_active(now) {
463                                    return false;
464                                }
465                                return task.description.starts_with(description);
466                            }
467
468                            false
469                        })
470                        .map(|(p, task)| (p, task.unwrap().clone()))
471                        .collect()
472                } else {
473                    vec![]
474                };
475
476                let mut seen_ids = HashSet::new();
477                let mut to_close = Vec::new();
478
479                // If failed flag is set, simulate each task first
480                let client = Arc::new(client);
481                let simulation_tasks = tasks
482                    .iter()
483                    .filter(|(_, task)| seen_ids.insert(task.id))
484                    .map(|(pubkey, _)| *pubkey)
485                    .collect::<Vec<_>>();
486
487                let mut simulation_results = HashMap::new();
488                if *failed {
489                    // Run simulations in parallel with a limit of 10 concurrent tasks
490                    let results = futures::stream::iter(simulation_tasks)
491                        .map(|pubkey| {
492                            let client = client.clone();
493                            async move {
494                                let result = simulate_task(&client, pubkey).await;
495                                (pubkey, result)
496                            }
497                        })
498                        .buffer_unordered(10)
499                        .collect::<Vec<_>>()
500                        .await;
501
502                    // Collect results into a HashMap for O(1) lookups
503                    simulation_results = results.into_iter().collect();
504                }
505
506                // Filter tasks based on simulation results if failed flag is set
507                for (pubkey, task) in &tasks {
508                    if seen_ids.contains(&task.id) {
509                        if *failed {
510                            if let Some(Ok(Some(sim_result))) = simulation_results.get(pubkey) {
511                                if sim_result.error.is_some() {
512                                    to_close.push(task.clone());
513                                }
514                            }
515                        } else {
516                            to_close.push(task.clone());
517                        }
518                    }
519                }
520
521                let ixs = to_close
522                    .into_iter()
523                    .map(|task| {
524                        tuktuk::task::dequeue_ix(
525                            task_queue_pubkey,
526                            client.payer.pubkey(),
527                            task.rent_refund,
528                            task.id,
529                        )
530                        .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
531                    })
532                    .collect::<Result<Vec<_>>>()?;
533
534                let ix_groups = ixs.into_iter().map(|ix| vec![ix]).collect_vec();
535                let groups = pack_instructions_into_transactions(
536                    &ix_groups.iter().map(|ix| ix.as_slice()).collect_vec(),
537                    None,
538                )?;
539
540                for mut to_send in groups {
541                    // Remove compute budget ixs
542                    to_send.instructions.remove(0);
543                    to_send.instructions.remove(0);
544                    send_instructions(
545                        client.rpc_client.clone(),
546                        &client.payer,
547                        client.opts.ws_url().as_str(),
548                        &to_send.instructions,
549                        &[],
550                    )
551                    .await?;
552                }
553            }
554            Cmd::Run {
555                task_queue,
556                id,
557                skip_preflight,
558                description,
559            } => {
560                if id.is_none() && description.is_none() {
561                    return Err(anyhow!("Either id or description must be provided"));
562                }
563                if id.is_some() && description.is_some() {
564                    return Err(anyhow!("Only one of id or description can be provided"));
565                }
566                let client = opts.client().await?;
567                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
568                let task_queue: TaskQueueV0 = client
569                    .as_ref()
570                    .anchor_account(&task_queue_pubkey)
571                    .await?
572                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
573                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
574                let tasks = if let Some(id) = id {
575                    let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
576                    let task = client
577                        .as_ref()
578                        .anchor_account::<TaskV0>(&task_key)
579                        .await?
580                        .ok_or_else(|| anyhow!("Task not found"))?;
581                    vec![(task_key, task)]
582                } else if let Some(description) = description {
583                    let tasks = client
584                        .as_ref()
585                        .anchor_accounts::<TaskV0>(&task_keys)
586                        .await?;
587                    tasks
588                        .into_iter()
589                        .filter(|(_, task)| {
590                            if let Some(task) = task {
591                                return task.description.starts_with(description);
592                            }
593                            false
594                        })
595                        .map(|(p, task)| (p, task.unwrap().clone()))
596                        .collect()
597                } else {
598                    vec![]
599                };
600                for (task_key, _) in tasks {
601                    let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
602                        client.as_ref(),
603                        client.as_ref(),
604                        task_key,
605                        client.payer.pubkey(),
606                        &HashSet::new(),
607                    )
608                    .await;
609                    match run_ix_result {
610                        Ok(run_ix) => {
611                            let blockhash = client.rpc_client.get_latest_blockhash().await?;
612                            let (computed, _) = auto_compute_limit_and_price(
613                                &client.rpc_client,
614                                &run_ix.instructions,
615                                1.2,
616                                &client.payer.pubkey(),
617                                Some(blockhash),
618                                Some(run_ix.lookup_tables.clone()),
619                            )
620                            .await
621                            .unwrap();
622
623                            let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
624                            let message = VersionedMessage::V0(v0::Message::try_compile(
625                                &client.payer.pubkey(),
626                                &computed,
627                                &run_ix.lookup_tables,
628                                recent_blockhash,
629                            )?);
630                            let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
631                            let txid = client
632                                .rpc_client
633                                .send_transaction_with_config(
634                                    &tx,
635                                    solana_client::rpc_config::RpcSendTransactionConfig {
636                                        skip_preflight: *skip_preflight,
637                                        preflight_commitment: Some(CommitmentLevel::Confirmed),
638                                        ..Default::default()
639                                    },
640                                )
641                                .await?;
642
643                            println!("Tx sent: {txid}");
644                        }
645                        Err(e) => {
646                            println!("Error running task: {e:?}");
647                        }
648                    }
649                }
650            }
651            Cmd::Requeue {
652                task_queue,
653                id,
654                new_timestamp,
655                stale,
656                description,
657                after_id,
658            } => {
659                let client = opts.client().await?;
660                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
661                let task_queue: TaskQueueV0 = client
662                    .as_ref()
663                    .anchor_account(&task_queue_pubkey)
664                    .await?
665                    .ok_or_else(|| anyhow!("Topic account not found"))?;
666                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
667                let tasks = client
668                    .as_ref()
669                    .anchor_accounts::<TaskV0>(&task_keys)
670                    .await?;
671
672                let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
673                let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
674                let now = clock.unix_timestamp;
675
676                let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
677                    if let Some(task) = task {
678                        if *stale {
679                            let is_stale = task.trigger.is_active(now)
680                                && match task.trigger {
681                                    TriggerV0::Now => false,
682                                    TriggerV0::Timestamp(ts) => {
683                                        now - ts > task_queue.stale_task_age as i64
684                                    }
685                                };
686
687                            if !is_stale {
688                                return false;
689                            }
690                        }
691
692                        if let Some(description) = description {
693                            if !task.description.starts_with(description) {
694                                return false;
695                            }
696                        }
697
698                        if let Some(after_id) = after_id {
699                            if task.id <= *after_id {
700                                return false;
701                            }
702                        }
703
704                        if let Some(id) = id {
705                            if task.id != *id {
706                                return false;
707                            }
708                        }
709
710                        return true;
711                    }
712                    false
713                });
714
715                let collected_tasks = filtered_tasks
716                    .into_iter()
717                    .flat_map(|(_, task)| task)
718                    .collect_vec();
719
720                println!("Requeueing {} tasks", collected_tasks.len());
721
722                for task in collected_tasks {
723                    let (new_task_key, ix) = tuktuk::task::queue(
724                        client.as_ref(),
725                        client.payer.pubkey(),
726                        client.payer.pubkey(),
727                        task_queue_pubkey,
728                        QueueTaskArgsV0 {
729                            id: task.id,
730                            trigger: new_timestamp.map_or(TriggerV0::Now, TriggerV0::Timestamp),
731                            transaction: task.transaction.clone(),
732                            crank_reward: Some(task.crank_reward),
733                            free_tasks: task.free_tasks,
734                            description: task.description,
735                        },
736                    )
737                    .await?;
738
739                    send_instructions(
740                        client.rpc_client.clone(),
741                        &client.payer,
742                        client.opts.ws_url().as_str(),
743                        &[ix],
744                        &[],
745                    )
746                    .await?;
747
748                    println!("New task key: {new_task_key}");
749                }
750            }
751            Cmd::Watch {
752                task_queue,
753                description,
754            } => {
755                if description.is_empty() {
756                    return Err(anyhow!(
757                        "At least one description must be provided for watch command"
758                    ));
759                }
760
761                let client = opts.client().await?;
762                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
763                let task_queue: TaskQueueV0 = client
764                    .as_ref()
765                    .anchor_account(&task_queue_pubkey)
766                    .await?
767                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
768
769                let trimmed_descriptions: Vec<String> = description
770                    .iter()
771                    .map(|prefix| {
772                        if prefix.len() > 40 {
773                            prefix.chars().take(40).collect()
774                        } else {
775                            prefix.clone()
776                        }
777                    })
778                    .collect();
779
780                // First, get and display all existing tasks that match the description prefixes
781                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
782                let existing_tasks = client
783                    .as_ref()
784                    .anchor_accounts::<TaskV0>(&task_keys)
785                    .await?;
786
787                let mut watched_tasks = std::collections::HashMap::new();
788
789                // Filter and start watching existing tasks that match our prefixes
790                for (task_key, maybe_task) in existing_tasks {
791                    if let Some(task) = maybe_task {
792                        // Check if task description matches any of the prefixes
793                        let matches = trimmed_descriptions
794                            .iter()
795                            .any(|prefix| task.description.starts_with(prefix));
796                        if matches {
797                            println!(
798                                "Found existing matching task: {} (ID: {}, KEY: {})",
799                                task.description, task.id, task_key
800                            );
801                            watched_tasks.insert(task_key, task.id);
802                        }
803                    }
804                }
805
806                // Set up pubsub tracker for watching
807                let (pubsub_client_raw, _pubsub_handle, _shutdown_sender) =
808                    tuktuk_sdk::pubsub_client::PubsubClient::new(client.opts.ws_url().as_str())
809                        .await?;
810                let pubsub_client = Arc::new(pubsub_client_raw);
811                let pubsub_tracker = Arc::new(tuktuk_sdk::watcher::PubsubTracker::new(
812                    client.rpc_client.clone(),
813                    pubsub_client,
814                    Duration::from_secs(30),
815                    solana_sdk::commitment_config::CommitmentConfig::confirmed(),
816                ));
817
818                // Start watching for task updates
819                let (stream, _unsub) = tuktuk::task::on_new(
820                    client.as_ref(),
821                    &pubsub_tracker,
822                    &task_queue_pubkey,
823                    &task_queue,
824                )
825                .await?;
826                println!(
827                    "Watching for tasks with description prefixes: {:?}",
828                    trimmed_descriptions
829                );
830                println!("Press Ctrl+C to stop watching...");
831
832                let mut stream = Box::pin(stream);
833
834                while let Some(update) = stream.next().await {
835                    match update {
836                        Ok(task_update) => {
837                            // Check for new tasks that match any of our descriptions
838                            for (task_key, maybe_task) in task_update.tasks {
839                                if let Some(task) = maybe_task {
840                                    // Check if task description matches any of the prefixes
841                                    let matches = trimmed_descriptions
842                                        .iter()
843                                        .any(|prefix| task.description.starts_with(prefix));
844                                    if matches {
845                                        println!(
846                                            "Found matching task: {} (ID: {}, KEY: {})",
847                                            task.description, task.id, task_key
848                                        );
849                                        watched_tasks.insert(task_key, task.id);
850                                    }
851                                } else {
852                                    // Task was removed (completed)
853                                    if let Some(task_id) = watched_tasks.remove(&task_key) {
854                                        if let Err(e) =
855                                            handle_task_completion(&client, task_key, task_id).await
856                                        {
857                                            eprintln!("Error handling task completion: {}", e);
858                                        }
859                                    }
860                                }
861                            }
862
863                            // Check for removed tasks
864                            for removed_task_key in task_update.removed {
865                                if let Some(task_id) = watched_tasks.remove(&removed_task_key) {
866                                    if let Err(e) =
867                                        handle_task_completion(&client, removed_task_key, task_id)
868                                            .await
869                                    {
870                                        eprintln!("Error handling task completion: {}", e);
871                                    }
872                                }
873                            }
874                        }
875                        Err(e) => {
876                            eprintln!("Error receiving task update: {}", e);
877                        }
878                    }
879                }
880            }
881        }
882        Ok(())
883    }
884}
885
886#[derive(Serialize)]
887struct Task {
888    #[serde(with = "serde_pubkey")]
889    pub pubkey: Pubkey,
890    pub id: u16,
891    pub description: String,
892    #[serde(with = "serde_pubkey")]
893    pub rent_refund: Pubkey,
894    pub trigger: Trigger,
895    pub crank_reward: u64,
896    pub simulation_result: Option<SimulationResult>,
897    pub transaction: Option<TransactionSource>,
898}
899
900#[derive(Serialize)]
901enum Trigger {
902    Now,
903    Timestamp {
904        epoch: i64,
905        #[serde(rename = "human_readable")]
906        formatted: String,
907    },
908}
909
910impl From<TriggerV0> for Trigger {
911    fn from(trigger: TriggerV0) -> Self {
912        match trigger {
913            TriggerV0::Now => Trigger::Now,
914            TriggerV0::Timestamp(ts) => Trigger::Timestamp {
915                epoch: ts,
916                formatted: Local
917                    .timestamp_opt(ts, 0)
918                    .single()
919                    .unwrap_or_else(Local::now)
920                    .to_rfc3339(),
921            },
922        }
923    }
924}