tuktuk_cli/cmd/
task.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4    time::Duration,
5};
6
7use anyhow::anyhow;
8use chrono::{Local, TimeZone};
9use clap::{Args, Subcommand};
10use clock::SYSVAR_CLOCK;
11use futures::stream::StreamExt;
12use itertools::Itertools;
13use serde::Serialize;
14use solana_client::{
15    rpc_client::GetConfirmedSignaturesForAddress2Config,
16    rpc_config::{RpcSimulateTransactionConfig, RpcTransactionConfig},
17};
18use solana_sdk::{
19    commitment_config::CommitmentLevel,
20    message::{v0, VersionedMessage},
21    pubkey::Pubkey,
22    signer::Signer,
23    transaction::VersionedTransaction,
24};
25use solana_transaction_status_client_types::UiTransactionEncoding;
26use solana_transaction_utils::{
27    pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
28};
29use tuktuk_program::{
30    types::{QueueTaskArgsV0, TriggerV0},
31    TaskQueueV0, TaskV0,
32};
33use tuktuk_sdk::prelude::*;
34
35use super::{task_queue::TaskQueueArg, TransactionSource};
36use crate::{
37    client::{send_instructions, CliClient},
38    cmd::Opts,
39    result::Result,
40    serde::{print_json, serde_pubkey},
41};
42
43#[derive(Debug, Args)]
44pub struct TaskCmd {
45    #[arg(long, default_value = "false")]
46    pub verbose: bool,
47    #[command(subcommand)]
48    pub cmd: Cmd,
49}
50
51#[derive(Debug, Subcommand)]
52pub enum Cmd {
53    List {
54        #[command(flatten)]
55        task_queue: TaskQueueArg,
56        // Description prefix for the task to filter by
57        #[arg(long)]
58        description: Option<String>,
59        #[arg(long, default_value = "false")]
60        skip_simulate: bool,
61        #[arg(
62            long,
63            help = "Only show tasks that could be executed now",
64            default_value = "false"
65        )]
66        active: bool,
67        #[arg(long, help = "Show tasks with a succesful/failed simulation")]
68        successful: Option<bool>,
69        #[arg(long, help = "Limit the number of tasks returned")]
70        limit: Option<u32>,
71    },
72    Run {
73        #[command(flatten)]
74        task_queue: TaskQueueArg,
75        #[arg(short, long)]
76        id: Option<u16>,
77        // Description prefix to run by
78        #[arg(long)]
79        description: Option<String>,
80        #[arg(short, long, default_value = "false")]
81        skip_preflight: bool,
82    },
83    Requeue {
84        #[command(flatten)]
85        task_queue: TaskQueueArg,
86        #[arg(short, long)]
87        id: Option<u16>,
88        #[arg(short, long, default_value = "false", help = "Requeue all stale tasks")]
89        stale: bool,
90        #[arg(long)]
91        description: Option<String>,
92        #[arg(long)]
93        after_id: Option<u16>,
94        #[arg(long)]
95        new_timestamp: Option<i64>,
96    },
97    Close {
98        #[command(flatten)]
99        task_queue: TaskQueueArg,
100        #[arg(short, long)]
101        id: Option<u16>,
102        // Description prefix to close by
103        #[arg(long)]
104        description: Option<String>,
105        #[arg(
106            long,
107            default_value = "false",
108            help = "Close tasks that fail simulation"
109        )]
110        failed: bool,
111    },
112    Watch {
113        #[command(flatten)]
114        task_queue: TaskQueueArg,
115        #[arg(
116            long,
117            help = "Description prefix to watch for (can be specified multiple times)"
118        )]
119        description: Vec<String>,
120    },
121}
122
123async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
124    // Get the run instruction
125    let run_ix_res = tuktuk_sdk::compiled_transaction::run_ix(
126        client.as_ref(),
127        task_key,
128        client.payer.pubkey(),
129        &HashSet::new(),
130    )
131    .await;
132
133    match run_ix_res {
134        Ok(run_ix) => {
135            // Create and simulate the transaction
136            let mut updated_instructions = vec![
137                solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(
138                    1900000,
139                ),
140            ];
141            updated_instructions.extend(run_ix.instructions.clone());
142            let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
143            let message = VersionedMessage::V0(v0::Message::try_compile(
144                &client.payer.pubkey(),
145                &updated_instructions,
146                &run_ix.lookup_tables,
147                recent_blockhash,
148            )?);
149            let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
150            let sim_result = client
151                .rpc_client
152                .simulate_transaction_with_config(
153                    &tx,
154                    RpcSimulateTransactionConfig {
155                        commitment: Some(
156                            solana_sdk::commitment_config::CommitmentConfig::confirmed(),
157                        ),
158                        sig_verify: true,
159                        ..Default::default()
160                    },
161                )
162                .await;
163
164            match sim_result {
165                Ok(simulated) => Ok(Some(SimulationResult {
166                    error: simulated.value.err.map(|e| e.to_string()),
167                    logs: Some(simulated.value.logs.unwrap_or_default()),
168                    compute_units: simulated.value.units_consumed,
169                })),
170                Err(err) => Ok(Some(SimulationResult {
171                    error: Some(err.to_string()),
172                    logs: None,
173                    compute_units: None,
174                })),
175            }
176        }
177        Err(tuktuk_sdk::error::Error::AccountNotFound) => Ok(None),
178        Err(e) => Ok(Some(SimulationResult {
179            error: Some(e.to_string()),
180            logs: None,
181            compute_units: None,
182        })),
183    }
184}
185
186#[derive(Clone, Serialize)]
187struct SimulationResult {
188    pub error: Option<String>,
189    pub logs: Option<Vec<String>>,
190    pub compute_units: Option<u64>,
191}
192
193async fn handle_task_completion(client: &CliClient, task_key: Pubkey, task_id: u16) -> Result {
194    println!(
195        "Task {} completed! Getting transaction signature...",
196        task_id
197    );
198
199    // Get the last 10 transaction signatures for this task
200    let signatures = client
201        .rpc_client
202        .get_signatures_for_address_with_config(
203            &task_key,
204            GetConfirmedSignaturesForAddress2Config {
205                limit: Some(10),
206                ..Default::default()
207            },
208        )
209        .await?;
210
211    if signatures.is_empty() {
212        println!("No transaction signature found for task {}", task_id);
213        return Ok(());
214    }
215
216    // Limit to last 10 transactions
217    let recent_signatures: Vec<solana_sdk::signature::Signature> = signatures
218        .iter()
219        .take(10)
220        .map(|sig_info| sig_info.signature.parse().unwrap())
221        .collect();
222
223    // Get statuses for all signatures at once
224    let signature_statuses = client
225        .rpc_client
226        .get_signature_statuses_with_history(&recent_signatures)
227        .await?;
228
229    // Find the first successful transaction
230    let mut successful_signature = None;
231    for (i, status_result) in signature_statuses.value.iter().enumerate() {
232        match status_result {
233            Some(status) => {
234                // Check if the transaction was successful (no error)
235                if status.err.is_none() {
236                    successful_signature = Some(recent_signatures[i].to_string());
237                    break;
238                }
239            }
240            None => {
241                // Transaction not found, continue to next
242                continue;
243            }
244        }
245    }
246
247    if let Some(signature) = successful_signature {
248        println!("Successful transaction signature: {}", signature);
249
250        // Get the full transaction to extract logs
251        match client
252            .rpc_client
253            .get_transaction_with_config(
254                &signature.parse()?,
255                RpcTransactionConfig {
256                    encoding: Some(UiTransactionEncoding::Json),
257                    max_supported_transaction_version: Some(0),
258                    ..Default::default()
259                },
260            )
261            .await
262        {
263            Ok(tx) => {
264                if let Some(meta) = tx.transaction.meta {
265                    match meta.log_messages {
266                        solana_transaction_status_client_types::option_serializer::OptionSerializer::Some(logs) => {
267                            println!("Transaction logs:");
268                            for log in logs {
269                                println!("  {}", log);
270                            }
271                        }
272                        _ => {
273                            println!("No logs found in transaction");
274                        }
275                    }
276                } else {
277                    println!("No transaction metadata found");
278                }
279            }
280            Err(e) => {
281                println!("Error getting transaction details: {}", e);
282            }
283        }
284    } else {
285        println!(
286            "No successful transaction found for task {} (all {} recent transactions failed)",
287            task_id,
288            recent_signatures.len()
289        );
290    }
291
292    Ok(())
293}
294
295impl TaskCmd {
296    pub async fn run(&self, opts: Opts) -> Result {
297        match &self.cmd {
298            Cmd::List {
299                task_queue,
300                description,
301                skip_simulate,
302                active,
303                limit,
304                successful,
305            } => {
306                let client = opts.client().await?;
307                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
308
309                let task_queue: TaskQueueV0 = client
310                    .as_ref()
311                    .anchor_account(&task_queue_pubkey)
312                    .await?
313                    .ok_or_else(|| anyhow!("Topic account not found"))?;
314                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
315                let tasks = client
316                    .as_ref()
317                    .anchor_accounts::<TaskV0>(&task_keys)
318                    .await?;
319
320                let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
321                let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
322                let now = clock.unix_timestamp;
323
324                let filtered_tasks = tasks
325                    .into_iter()
326                    .filter(|(_, task)| {
327                        if let Some(task) = task {
328                            if let Some(description) = description {
329                                if !task.description.starts_with(description) {
330                                    return false;
331                                }
332                            }
333
334                            // If active flag is set, only show tasks that are active
335                            // Otherwise, show all tasks
336                            return !*active || task.trigger.is_active(now);
337                        }
338                        false
339                    })
340                    .collect::<Vec<_>>();
341
342                let mut json_tasks = Vec::with_capacity(filtered_tasks.len());
343                let mut simulation_tasks = Vec::new();
344
345                for (i, (pubkey, maybe_task)) in filtered_tasks.into_iter().enumerate() {
346                    if let Some(task) = maybe_task {
347                        if !*skip_simulate && task.trigger.is_active(now) {
348                            simulation_tasks.push((i, pubkey));
349                        }
350
351                        json_tasks.push((
352                            i,
353                            Task {
354                                pubkey,
355                                id: task.id,
356                                description: task.description,
357                                trigger: Trigger::from(task.trigger),
358                                crank_reward: task.crank_reward,
359                                rent_refund: task.rent_refund,
360                                simulation_result: None,
361                                transaction: if self.verbose {
362                                    Some(TransactionSource::from(task.transaction.clone()))
363                                } else {
364                                    None
365                                },
366                            },
367                        ));
368
369                        if let Some(limit) = limit {
370                            if json_tasks.len() >= *limit as usize {
371                                break;
372                            }
373                        }
374                    }
375                }
376
377                // Run simulations in parallel with a limit of 10 concurrent tasks
378                let client = Arc::new(client);
379                let simulation_results = futures::stream::iter(simulation_tasks)
380                    .map(|(i, pubkey)| {
381                        let client = client.clone();
382                        async move {
383                            let result = simulate_task(&client, pubkey).await;
384                            (i, result)
385                        }
386                    })
387                    .buffer_unordered(10)
388                    .collect::<Vec<_>>()
389                    .await;
390
391                let mut results = vec![None; json_tasks.len()];
392                for (i, result) in simulation_results {
393                    if let Ok(sim_result) = result {
394                        results[i] = sim_result;
395                    }
396                }
397
398                // Update tasks with simulation results
399                for (i, task) in json_tasks.iter_mut() {
400                    task.simulation_result = results[*i].clone();
401                }
402
403                // Filter by simulation success/failure if requested
404                let mut final_tasks = json_tasks
405                    .into_iter()
406                    .map(|(_, task)| task)
407                    .collect::<Vec<_>>();
408                if let Some(successful) = successful {
409                    final_tasks.retain(|task| {
410                        if let Some(simulation_result) = &task.simulation_result {
411                            (*successful && simulation_result.error.is_none())
412                                || (!*successful && simulation_result.error.is_some())
413                        } else {
414                            !*successful
415                        }
416                    });
417                }
418
419                print_json(&final_tasks)?;
420            }
421            Cmd::Close {
422                task_queue,
423                id: index,
424                description,
425                failed,
426            } => {
427                if index.is_none() && description.is_none() {
428                    return Err(anyhow!("Either id or description must be provided"));
429                }
430                if index.is_some() && description.is_some() {
431                    return Err(anyhow!("Only one of id or description can be provided"));
432                }
433                let client = opts.client().await?;
434                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
435                let task_queue: TaskQueueV0 = client
436                    .as_ref()
437                    .anchor_account(&task_queue_pubkey)
438                    .await?
439                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
440                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
441                let tasks = if let Some(index) = index {
442                    let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
443                    let task = client
444                        .as_ref()
445                        .anchor_account::<TaskV0>(&task_key)
446                        .await?
447                        .ok_or_else(|| anyhow!("Task not found"))?;
448                    vec![(task_key, task)]
449                } else if let Some(description) = description {
450                    let tasks = client
451                        .as_ref()
452                        .anchor_accounts::<TaskV0>(&task_keys)
453                        .await?;
454                    let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
455                    let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
456                    let now = clock.unix_timestamp;
457                    tasks
458                        .into_iter()
459                        .filter(|(_, task)| {
460                            if let Some(task) = task {
461                                if *failed && !task.trigger.is_active(now) {
462                                    return false;
463                                }
464                                return task.description.starts_with(description);
465                            }
466
467                            false
468                        })
469                        .map(|(p, task)| (p, task.unwrap().clone()))
470                        .collect()
471                } else {
472                    vec![]
473                };
474
475                let mut seen_ids = HashSet::new();
476                let mut to_close = Vec::new();
477
478                // If failed flag is set, simulate each task first
479                let client = Arc::new(client);
480                let simulation_tasks = tasks
481                    .iter()
482                    .filter(|(_, task)| seen_ids.insert(task.id))
483                    .map(|(pubkey, _)| *pubkey)
484                    .collect::<Vec<_>>();
485
486                let mut simulation_results = HashMap::new();
487                if *failed {
488                    // Run simulations in parallel with a limit of 10 concurrent tasks
489                    let results = futures::stream::iter(simulation_tasks)
490                        .map(|pubkey| {
491                            let client = client.clone();
492                            async move {
493                                let result = simulate_task(&client, pubkey).await;
494                                (pubkey, result)
495                            }
496                        })
497                        .buffer_unordered(10)
498                        .collect::<Vec<_>>()
499                        .await;
500
501                    // Collect results into a HashMap for O(1) lookups
502                    simulation_results = results.into_iter().collect();
503                }
504
505                // Filter tasks based on simulation results if failed flag is set
506                for (pubkey, task) in &tasks {
507                    if seen_ids.contains(&task.id) {
508                        if *failed {
509                            if let Some(Ok(Some(sim_result))) = simulation_results.get(pubkey) {
510                                if sim_result.error.is_some() {
511                                    to_close.push(task.clone());
512                                }
513                            }
514                        } else {
515                            to_close.push(task.clone());
516                        }
517                    }
518                }
519
520                let ixs = to_close
521                    .into_iter()
522                    .map(|task| {
523                        tuktuk::task::dequeue_ix(
524                            task_queue_pubkey,
525                            client.payer.pubkey(),
526                            task.rent_refund,
527                            task.id,
528                        )
529                        .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
530                    })
531                    .collect::<Result<Vec<_>>>()?;
532
533                let ix_groups = ixs.into_iter().map(|ix| vec![ix]).collect_vec();
534                let groups = pack_instructions_into_transactions(
535                    &ix_groups.iter().map(|ix| ix.as_slice()).collect_vec(),
536                    None,
537                )?;
538
539                for mut to_send in groups {
540                    // Remove compute budget ixs
541                    to_send.instructions.remove(0);
542                    to_send.instructions.remove(0);
543                    send_instructions(
544                        client.rpc_client.clone(),
545                        &client.payer,
546                        client.opts.ws_url().as_str(),
547                        &to_send.instructions,
548                        &[],
549                    )
550                    .await?;
551                }
552            }
553            Cmd::Run {
554                task_queue,
555                id,
556                skip_preflight,
557                description,
558            } => {
559                if id.is_none() && description.is_none() {
560                    return Err(anyhow!("Either id or description must be provided"));
561                }
562                if id.is_some() && description.is_some() {
563                    return Err(anyhow!("Only one of id or description can be provided"));
564                }
565                let client = opts.client().await?;
566                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
567                let task_queue: TaskQueueV0 = client
568                    .as_ref()
569                    .anchor_account(&task_queue_pubkey)
570                    .await?
571                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
572                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
573                let tasks = if let Some(id) = id {
574                    let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
575                    let task = client
576                        .as_ref()
577                        .anchor_account::<TaskV0>(&task_key)
578                        .await?
579                        .ok_or_else(|| anyhow!("Task not found"))?;
580                    vec![(task_key, task)]
581                } else if let Some(description) = description {
582                    let tasks = client
583                        .as_ref()
584                        .anchor_accounts::<TaskV0>(&task_keys)
585                        .await?;
586                    tasks
587                        .into_iter()
588                        .filter(|(_, task)| {
589                            if let Some(task) = task {
590                                return task.description.starts_with(description);
591                            }
592                            false
593                        })
594                        .map(|(p, task)| (p, task.unwrap().clone()))
595                        .collect()
596                } else {
597                    vec![]
598                };
599                for (task_key, _) in tasks {
600                    let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
601                        client.as_ref(),
602                        task_key,
603                        client.payer.pubkey(),
604                        &HashSet::new(),
605                    )
606                    .await;
607                    match run_ix_result {
608                        Ok(run_ix) => {
609                            let blockhash = client.rpc_client.get_latest_blockhash().await?;
610                            let (computed, _) = auto_compute_limit_and_price(
611                                &client.rpc_client,
612                                &run_ix.instructions,
613                                1.2,
614                                &client.payer.pubkey(),
615                                Some(blockhash),
616                                Some(run_ix.lookup_tables.clone()),
617                            )
618                            .await
619                            .unwrap();
620
621                            let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
622                            let message = VersionedMessage::V0(v0::Message::try_compile(
623                                &client.payer.pubkey(),
624                                &computed,
625                                &run_ix.lookup_tables,
626                                recent_blockhash,
627                            )?);
628                            let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
629                            let txid = client
630                                .rpc_client
631                                .send_transaction_with_config(
632                                    &tx,
633                                    solana_client::rpc_config::RpcSendTransactionConfig {
634                                        skip_preflight: *skip_preflight,
635                                        preflight_commitment: Some(CommitmentLevel::Confirmed),
636                                        ..Default::default()
637                                    },
638                                )
639                                .await?;
640
641                            println!("Tx sent: {txid}");
642                        }
643                        Err(e) => {
644                            println!("Error running task: {e:?}");
645                        }
646                    }
647                }
648            }
649            Cmd::Requeue {
650                task_queue,
651                id,
652                new_timestamp,
653                stale,
654                description,
655                after_id,
656            } => {
657                let client = opts.client().await?;
658                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
659                let task_queue: TaskQueueV0 = client
660                    .as_ref()
661                    .anchor_account(&task_queue_pubkey)
662                    .await?
663                    .ok_or_else(|| anyhow!("Topic account not found"))?;
664                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
665                let tasks = client
666                    .as_ref()
667                    .anchor_accounts::<TaskV0>(&task_keys)
668                    .await?;
669
670                let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
671                let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
672                let now = clock.unix_timestamp;
673
674                let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
675                    if let Some(task) = task {
676                        if *stale {
677                            let is_stale = task.trigger.is_active(now)
678                                && match task.trigger {
679                                    TriggerV0::Now => false,
680                                    TriggerV0::Timestamp(ts) => {
681                                        now - ts > task_queue.stale_task_age as i64
682                                    }
683                                };
684
685                            if !is_stale {
686                                return false;
687                            }
688                        }
689
690                        if let Some(description) = description {
691                            if !task.description.starts_with(description) {
692                                return false;
693                            }
694                        }
695
696                        if let Some(after_id) = after_id {
697                            if task.id <= *after_id {
698                                return false;
699                            }
700                        }
701
702                        if let Some(id) = id {
703                            if task.id != *id {
704                                return false;
705                            }
706                        }
707
708                        return true;
709                    }
710                    false
711                });
712
713                let collected_tasks = filtered_tasks
714                    .into_iter()
715                    .flat_map(|(_, task)| task)
716                    .collect_vec();
717
718                println!("Requeueing {} tasks", collected_tasks.len());
719
720                for task in collected_tasks {
721                    let (new_task_key, ix) = tuktuk::task::queue(
722                        client.as_ref(),
723                        client.payer.pubkey(),
724                        client.payer.pubkey(),
725                        task_queue_pubkey,
726                        QueueTaskArgsV0 {
727                            id: task.id,
728                            trigger: new_timestamp.map_or(TriggerV0::Now, TriggerV0::Timestamp),
729                            transaction: task.transaction.clone(),
730                            crank_reward: Some(task.crank_reward),
731                            free_tasks: task.free_tasks,
732                            description: task.description,
733                        },
734                    )
735                    .await?;
736
737                    send_instructions(
738                        client.rpc_client.clone(),
739                        &client.payer,
740                        client.opts.ws_url().as_str(),
741                        &[ix],
742                        &[],
743                    )
744                    .await?;
745
746                    println!("New task key: {new_task_key}");
747                }
748            }
749            Cmd::Watch {
750                task_queue,
751                description,
752            } => {
753                if description.is_empty() {
754                    return Err(anyhow!(
755                        "At least one description must be provided for watch command"
756                    ));
757                }
758
759                let client = opts.client().await?;
760                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
761                let task_queue: TaskQueueV0 = client
762                    .as_ref()
763                    .anchor_account(&task_queue_pubkey)
764                    .await?
765                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
766
767                let trimmed_descriptions: Vec<String> = description
768                    .iter()
769                    .map(|prefix| {
770                        if prefix.len() > 40 {
771                            prefix.chars().take(40).collect()
772                        } else {
773                            prefix.clone()
774                        }
775                    })
776                    .collect();
777
778                // First, get and display all existing tasks that match the description prefixes
779                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
780                let existing_tasks = client
781                    .as_ref()
782                    .anchor_accounts::<TaskV0>(&task_keys)
783                    .await?;
784
785                let mut watched_tasks = std::collections::HashMap::new();
786
787                // Filter and start watching existing tasks that match our prefixes
788                for (task_key, maybe_task) in existing_tasks {
789                    if let Some(task) = maybe_task {
790                        // Check if task description matches any of the prefixes
791                        let matches = trimmed_descriptions
792                            .iter()
793                            .any(|prefix| task.description.starts_with(prefix));
794                        if matches {
795                            println!(
796                                "Found existing matching task: {} (ID: {}, KEY: {})",
797                                task.description, task.id, task_key
798                            );
799                            watched_tasks.insert(task_key, task.id);
800                        }
801                    }
802                }
803
804                // Set up pubsub tracker for watching
805                let (pubsub_client_raw, _pubsub_handle, _shutdown_sender) =
806                    tuktuk_sdk::pubsub_client::PubsubClient::new(client.opts.ws_url().as_str())
807                        .await?;
808                let pubsub_client = Arc::new(pubsub_client_raw);
809                let pubsub_tracker = Arc::new(tuktuk_sdk::watcher::PubsubTracker::new(
810                    client.rpc_client.clone(),
811                    pubsub_client,
812                    Duration::from_secs(30),
813                    solana_sdk::commitment_config::CommitmentConfig::confirmed(),
814                ));
815
816                // Start watching for task updates
817                let (stream, _unsub) = tuktuk::task::on_new(
818                    client.as_ref(),
819                    &pubsub_tracker,
820                    &task_queue_pubkey,
821                    &task_queue,
822                )
823                .await?;
824                println!(
825                    "Watching for tasks with description prefixes: {:?}",
826                    trimmed_descriptions
827                );
828                println!("Press Ctrl+C to stop watching...");
829
830                let mut stream = Box::pin(stream);
831
832                while let Some(update) = stream.next().await {
833                    match update {
834                        Ok(task_update) => {
835                            // Check for new tasks that match any of our descriptions
836                            for (task_key, maybe_task) in task_update.tasks {
837                                if let Some(task) = maybe_task {
838                                    // Check if task description matches any of the prefixes
839                                    let matches = trimmed_descriptions
840                                        .iter()
841                                        .any(|prefix| task.description.starts_with(prefix));
842                                    if matches {
843                                        println!(
844                                            "Found matching task: {} (ID: {}, KEY: {})",
845                                            task.description, task.id, task_key
846                                        );
847                                        watched_tasks.insert(task_key, task.id);
848                                    }
849                                } else {
850                                    // Task was removed (completed)
851                                    if let Some(task_id) = watched_tasks.remove(&task_key) {
852                                        if let Err(e) =
853                                            handle_task_completion(&client, task_key, task_id).await
854                                        {
855                                            eprintln!("Error handling task completion: {}", e);
856                                        }
857                                    }
858                                }
859                            }
860
861                            // Check for removed tasks
862                            for removed_task_key in task_update.removed {
863                                if let Some(task_id) = watched_tasks.remove(&removed_task_key) {
864                                    if let Err(e) =
865                                        handle_task_completion(&client, removed_task_key, task_id)
866                                            .await
867                                    {
868                                        eprintln!("Error handling task completion: {}", e);
869                                    }
870                                }
871                            }
872                        }
873                        Err(e) => {
874                            eprintln!("Error receiving task update: {}", e);
875                        }
876                    }
877                }
878            }
879        }
880        Ok(())
881    }
882}
883
884#[derive(Serialize)]
885struct Task {
886    #[serde(with = "serde_pubkey")]
887    pub pubkey: Pubkey,
888    pub id: u16,
889    pub description: String,
890    #[serde(with = "serde_pubkey")]
891    pub rent_refund: Pubkey,
892    pub trigger: Trigger,
893    pub crank_reward: u64,
894    pub simulation_result: Option<SimulationResult>,
895    pub transaction: Option<TransactionSource>,
896}
897
898#[derive(Serialize)]
899enum Trigger {
900    Now,
901    Timestamp {
902        epoch: i64,
903        #[serde(rename = "human_readable")]
904        formatted: String,
905    },
906}
907
908impl From<TriggerV0> for Trigger {
909    fn from(trigger: TriggerV0) -> Self {
910        match trigger {
911            TriggerV0::Now => Trigger::Now,
912            TriggerV0::Timestamp(ts) => Trigger::Timestamp {
913                epoch: ts,
914                formatted: Local
915                    .timestamp_opt(ts, 0)
916                    .single()
917                    .unwrap_or_else(Local::now)
918                    .to_rfc3339(),
919            },
920        }
921    }
922}