tuktuk_cli/cmd/
task.rs

1use std::collections::HashSet;
2
3use anyhow::anyhow;
4use clap::{Args, Subcommand};
5use clock::SYSVAR_CLOCK;
6use itertools::Itertools;
7use serde::Serialize;
8use solana_client::rpc_config::RpcSimulateTransactionConfig;
9use solana_sdk::{
10    commitment_config::CommitmentLevel,
11    message::{v0, VersionedMessage},
12    pubkey::Pubkey,
13    signer::Signer,
14    transaction::VersionedTransaction,
15};
16use solana_transaction_utils::{
17    pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
18};
19use tuktuk_program::{types::TriggerV0, TaskQueueV0, TaskV0};
20use tuktuk_sdk::prelude::*;
21
22use super::{task_queue::TaskQueueArg, TransactionSource};
23use crate::{
24    client::{send_instructions, CliClient},
25    cmd::Opts,
26    result::Result,
27    serde::{print_json, serde_pubkey},
28};
29
30#[derive(Debug, Args)]
31pub struct TaskCmd {
32    #[arg(long, default_value = "false")]
33    pub verbose: bool,
34    #[command(subcommand)]
35    pub cmd: Cmd,
36}
37
38#[derive(Debug, Subcommand)]
39pub enum Cmd {
40    List {
41        #[command(flatten)]
42        task_queue: TaskQueueArg,
43        // Description prefix for the task to filter by
44        #[arg(long)]
45        description: Option<String>,
46        #[arg(long, default_value = "false")]
47        skip_simulate: bool,
48    },
49    Run {
50        #[command(flatten)]
51        task_queue: TaskQueueArg,
52        #[arg(short, long)]
53        id: Option<u16>,
54        // Description prefix to run by
55        #[arg(long)]
56        description: Option<String>,
57        #[arg(short, long, default_value = "false")]
58        skip_preflight: bool,
59    },
60    Close {
61        #[command(flatten)]
62        task_queue: TaskQueueArg,
63        #[arg(short, long)]
64        id: Option<u16>,
65        // Description prefix to close by
66        #[arg(long)]
67        description: Option<String>,
68        #[arg(
69            long,
70            default_value = "false",
71            help = "Close tasks that fail simulation"
72        )]
73        failed: bool,
74    },
75}
76
77async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
78    // Get the run instruction
79    let run_ix = tuktuk_sdk::compiled_transaction::run_ix(
80        client.as_ref(),
81        task_key,
82        client.payer.pubkey(),
83        &HashSet::new(),
84    )
85    .await?;
86
87    if let Some(run_ix) = run_ix {
88        // Create and simulate the transaction
89        let mut updated_instructions = vec![
90            solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1900000),
91        ];
92        updated_instructions.extend(run_ix.instructions.clone());
93        let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
94        let message = VersionedMessage::V0(v0::Message::try_compile(
95            &client.payer.pubkey(),
96            &updated_instructions,
97            &run_ix.lookup_tables,
98            recent_blockhash,
99        )?);
100        let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
101        let sim_result = client
102            .rpc_client
103            .simulate_transaction_with_config(
104                &tx,
105                RpcSimulateTransactionConfig {
106                    commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()),
107                    sig_verify: true,
108                    ..Default::default()
109                },
110            )
111            .await;
112
113        match sim_result {
114            Ok(simulated) => Ok(Some(SimulationResult {
115                error: simulated.value.err.map(|e| e.to_string()),
116                logs: Some(simulated.value.logs.unwrap_or_default()),
117                compute_units: simulated.value.units_consumed,
118            })),
119            Err(err) => Ok(Some(SimulationResult {
120                error: Some(err.to_string()),
121                logs: None,
122                compute_units: None,
123            })),
124        }
125    } else {
126        Ok(None)
127    }
128}
129
130impl TaskCmd {
131    pub async fn run(&self, opts: Opts) -> Result {
132        match &self.cmd {
133            Cmd::List {
134                task_queue,
135                description,
136                skip_simulate,
137            } => {
138                let client = opts.client().await?;
139                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
140
141                let task_queue: TaskQueueV0 = client
142                    .as_ref()
143                    .anchor_account(&task_queue_pubkey)
144                    .await?
145                    .ok_or_else(|| anyhow!("Topic account not found"))?;
146                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
147                let tasks = client
148                    .as_ref()
149                    .anchor_accounts::<TaskV0>(&task_keys)
150                    .await?;
151                let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
152                    if let Some(task) = task {
153                        if let Some(description) = description {
154                            return task.description.starts_with(description);
155                        }
156                    }
157                    true
158                });
159
160                let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
161                let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
162                let now = clock.unix_timestamp;
163
164                let mut json_tasks = Vec::new();
165                for (pubkey, maybe_task) in filtered_tasks {
166                    if let Some(task) = maybe_task {
167                        let mut simulation_result = None;
168                        if !*skip_simulate && task.trigger.is_active(now) {
169                            simulation_result = simulate_task(&client, pubkey).await?;
170                        }
171
172                        json_tasks.push(Task {
173                            pubkey,
174                            id: task.id,
175                            description: task.description,
176                            trigger: Trigger::from(task.trigger),
177                            crank_reward: task.crank_reward,
178                            rent_refund: task.rent_refund,
179                            simulation_result,
180                            transaction: if self.verbose {
181                                Some(TransactionSource::from(task.transaction.clone()))
182                            } else {
183                                None
184                            },
185                        });
186                    }
187                }
188                print_json(&json_tasks)?;
189            }
190            Cmd::Close {
191                task_queue,
192                id: index,
193                description,
194                failed,
195            } => {
196                if index.is_none() && description.is_none() {
197                    return Err(anyhow!("Either id or description must be provided"));
198                }
199                if index.is_some() && description.is_some() {
200                    return Err(anyhow!("Only one of id or description can be provided"));
201                }
202                let client = opts.client().await?;
203                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
204                let task_queue: TaskQueueV0 = client
205                    .as_ref()
206                    .anchor_account(&task_queue_pubkey)
207                    .await?
208                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
209                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
210                let tasks = if let Some(index) = index {
211                    let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
212                    let task = client
213                        .as_ref()
214                        .anchor_account::<TaskV0>(&task_key)
215                        .await?
216                        .ok_or_else(|| anyhow!("Task not found"))?;
217                    vec![(task_key, task)]
218                } else if let Some(description) = description {
219                    let tasks = client
220                        .as_ref()
221                        .anchor_accounts::<TaskV0>(&task_keys)
222                        .await?;
223                    tasks
224                        .into_iter()
225                        .filter(|(_, task)| {
226                            if let Some(task) = task {
227                                return task.description.starts_with(description);
228                            }
229                            false
230                        })
231                        .map(|(p, task)| (p, task.unwrap().clone()))
232                        .collect()
233                } else {
234                    vec![]
235                };
236
237                let mut seen_ids = HashSet::new();
238                let mut to_close = Vec::new();
239
240                // If failed flag is set, simulate each task first
241                for (pubkey, task) in &tasks {
242                    if seen_ids.insert(task.id) {
243                        if *failed {
244                            if let Some(sim_result) = simulate_task(&client, *pubkey).await? {
245                                if sim_result.error.is_some() {
246                                    to_close.push(task.clone());
247                                }
248                            }
249                        } else {
250                            to_close.push(task.clone());
251                        }
252                    }
253                }
254
255                let ixs = to_close
256                    .into_iter()
257                    .map(|task| {
258                        tuktuk::task::dequeue_ix(
259                            task_queue_pubkey,
260                            client.payer.pubkey(),
261                            task.rent_refund,
262                            task.id,
263                        )
264                        .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
265                    })
266                    .collect::<Result<Vec<_>>>()?;
267
268                let ix_groups = ixs.into_iter().map(|ix| vec![ix]).collect_vec();
269                let groups = pack_instructions_into_transactions(
270                    &ix_groups.iter().map(|ix| ix.as_slice()).collect_vec(),
271                    &client.payer,
272                    None,
273                )?;
274
275                for mut to_send in groups {
276                    // Remove compute budget ixs
277                    to_send.instructions.remove(0);
278                    to_send.instructions.remove(0);
279                    send_instructions(
280                        client.rpc_client.clone(),
281                        &client.payer,
282                        client.opts.ws_url().as_str(),
283                        &to_send.instructions,
284                        &[],
285                    )
286                    .await?;
287                }
288            }
289            Cmd::Run {
290                task_queue,
291                id,
292                skip_preflight,
293                description,
294            } => {
295                if id.is_none() && description.is_none() {
296                    return Err(anyhow!("Either id or description must be provided"));
297                }
298                if id.is_some() && description.is_some() {
299                    return Err(anyhow!("Only one of id or description can be provided"));
300                }
301                let client = opts.client().await?;
302                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
303                let task_queue: TaskQueueV0 = client
304                    .as_ref()
305                    .anchor_account(&task_queue_pubkey)
306                    .await?
307                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
308                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
309                let tasks = if let Some(id) = id {
310                    let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
311                    let task = client
312                        .as_ref()
313                        .anchor_account::<TaskV0>(&task_key)
314                        .await?
315                        .ok_or_else(|| anyhow!("Task not found"))?;
316                    vec![(task_key, task)]
317                } else if let Some(description) = description {
318                    let tasks = client
319                        .as_ref()
320                        .anchor_accounts::<TaskV0>(&task_keys)
321                        .await?;
322                    tasks
323                        .into_iter()
324                        .filter(|(_, task)| {
325                            if let Some(task) = task {
326                                return task.description.starts_with(description);
327                            }
328                            false
329                        })
330                        .map(|(p, task)| (p, task.unwrap().clone()))
331                        .collect()
332                } else {
333                    vec![]
334                };
335                for (task_key, _) in tasks {
336                    let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
337                        client.as_ref(),
338                        task_key,
339                        client.payer.pubkey(),
340                        &HashSet::new(),
341                    )
342                    .await;
343                    match run_ix_result {
344                        Ok(Some(run_ix)) => {
345                            let blockhash = client.rpc_client.get_latest_blockhash().await?;
346                            let (computed, _) = auto_compute_limit_and_price(
347                                &client.rpc_client,
348                                &run_ix.instructions,
349                                1.2,
350                                &client.payer.pubkey(),
351                                Some(blockhash),
352                                Some(run_ix.lookup_tables.clone()),
353                            )
354                            .await
355                            .unwrap();
356
357                            let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
358                            let message = VersionedMessage::V0(v0::Message::try_compile(
359                                &client.payer.pubkey(),
360                                &computed,
361                                &run_ix.lookup_tables,
362                                recent_blockhash,
363                            )?);
364                            let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
365                            let txid = client
366                                .rpc_client
367                                .send_transaction_with_config(
368                                    &tx,
369                                    solana_client::rpc_config::RpcSendTransactionConfig {
370                                        skip_preflight: *skip_preflight,
371                                        preflight_commitment: Some(CommitmentLevel::Confirmed),
372                                        ..Default::default()
373                                    },
374                                )
375                                .await?;
376
377                            println!("Tx sent: {}", txid);
378                        }
379                        Err(e) => {
380                            println!("Error running task: {}", e);
381                        }
382                        _ => {}
383                    }
384                }
385            }
386        }
387        Ok(())
388    }
389}
390
391#[derive(Serialize)]
392struct Task {
393    #[serde(with = "serde_pubkey")]
394    pub pubkey: Pubkey,
395    pub id: u16,
396    pub description: String,
397    #[serde(with = "serde_pubkey")]
398    pub rent_refund: Pubkey,
399    pub trigger: Trigger,
400    pub crank_reward: u64,
401    pub simulation_result: Option<SimulationResult>,
402    pub transaction: Option<TransactionSource>,
403}
404
405#[derive(Serialize)]
406struct SimulationResult {
407    pub error: Option<String>,
408    pub logs: Option<Vec<String>>,
409    pub compute_units: Option<u64>,
410}
411
412#[derive(Serialize)]
413enum Trigger {
414    Now,
415    Timestamp(i64),
416}
417
418impl From<TriggerV0> for Trigger {
419    fn from(trigger: TriggerV0) -> Self {
420        match trigger {
421            TriggerV0::Now => Trigger::Now,
422            TriggerV0::Timestamp(ts) => Trigger::Timestamp(ts),
423        }
424    }
425}