tuktuk_cli/cmd/
task.rs

1use std::collections::HashSet;
2
3use anyhow::anyhow;
4use clap::{Args, Subcommand};
5use clock::SYSVAR_CLOCK;
6use itertools::Itertools;
7use serde::Serialize;
8use solana_client::rpc_config::RpcSimulateTransactionConfig;
9use solana_sdk::{
10    commitment_config::CommitmentLevel,
11    message::{v0, VersionedMessage},
12    pubkey::Pubkey,
13    signer::Signer,
14    transaction::VersionedTransaction,
15};
16use solana_transaction_utils::{
17    pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
18};
19use tuktuk_program::{types::TriggerV0, TaskQueueV0, TaskV0};
20use tuktuk_sdk::prelude::*;
21
22use super::{task_queue::TaskQueueArg, TransactionSource};
23use crate::{
24    client::{send_instructions, CliClient},
25    cmd::Opts,
26    result::Result,
27    serde::{print_json, serde_pubkey},
28};
29
30#[derive(Debug, Args)]
31pub struct TaskCmd {
32    #[arg(long, default_value = "false")]
33    pub verbose: bool,
34    #[command(subcommand)]
35    pub cmd: Cmd,
36}
37
38#[derive(Debug, Subcommand)]
39pub enum Cmd {
40    List {
41        #[command(flatten)]
42        task_queue: TaskQueueArg,
43        // Description prefix for the task to filter by
44        #[arg(long)]
45        description: Option<String>,
46        #[arg(long, default_value = "false")]
47        skip_simulate: bool,
48    },
49    Run {
50        #[command(flatten)]
51        task_queue: TaskQueueArg,
52        #[arg(short, long)]
53        id: Option<u16>,
54        // Description prefix to run by
55        #[arg(long)]
56        description: Option<String>,
57        #[arg(short, long, default_value = "false")]
58        skip_preflight: bool,
59    },
60    Close {
61        #[command(flatten)]
62        task_queue: TaskQueueArg,
63        #[arg(short, long)]
64        id: Option<u16>,
65        // Description prefix to close by
66        #[arg(long)]
67        description: Option<String>,
68        #[arg(
69            long,
70            default_value = "false",
71            help = "Close tasks that fail simulation"
72        )]
73        failed: bool,
74    },
75}
76
77async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
78    // Get the run instruction
79    let run_ix = tuktuk_sdk::compiled_transaction::run_ix(
80        client.as_ref(),
81        task_key,
82        client.payer.pubkey(),
83        &HashSet::new(),
84    )
85    .await?;
86
87    // Create and simulate the transaction
88    let mut updated_instructions =
89        vec![solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1900000)];
90    updated_instructions.extend(run_ix.instructions.clone());
91    let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
92    let message = VersionedMessage::V0(v0::Message::try_compile(
93        &client.payer.pubkey(),
94        &updated_instructions,
95        &run_ix.lookup_tables,
96        recent_blockhash,
97    )?);
98    let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
99    let sim_result = client
100        .rpc_client
101        .simulate_transaction_with_config(
102            &tx,
103            RpcSimulateTransactionConfig {
104                commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()),
105                sig_verify: true,
106                ..Default::default()
107            },
108        )
109        .await;
110
111    match sim_result {
112        Ok(simulated) => Ok(Some(SimulationResult {
113            error: simulated.value.err.map(|e| e.to_string()),
114            logs: Some(simulated.value.logs.unwrap_or_default()),
115            compute_units: simulated.value.units_consumed,
116        })),
117        Err(err) => Ok(Some(SimulationResult {
118            error: Some(err.to_string()),
119            logs: None,
120            compute_units: None,
121        })),
122    }
123}
124
125impl TaskCmd {
126    pub async fn run(&self, opts: Opts) -> Result {
127        match &self.cmd {
128            Cmd::List {
129                task_queue,
130                description,
131                skip_simulate,
132            } => {
133                let client = opts.client().await?;
134                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
135
136                let task_queue: TaskQueueV0 = client
137                    .as_ref()
138                    .anchor_account(&task_queue_pubkey)
139                    .await?
140                    .ok_or_else(|| anyhow!("Topic account not found"))?;
141                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
142                let tasks = client
143                    .as_ref()
144                    .anchor_accounts::<TaskV0>(&task_keys)
145                    .await?;
146                let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
147                    if let Some(task) = task {
148                        if let Some(description) = description {
149                            return task.description.starts_with(description);
150                        }
151                    }
152                    true
153                });
154
155                let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
156                let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
157                let now = clock.unix_timestamp;
158
159                let mut json_tasks = Vec::new();
160                for (pubkey, maybe_task) in filtered_tasks {
161                    if let Some(task) = maybe_task {
162                        let mut simulation_result = None;
163                        if !*skip_simulate && task.trigger.is_active(now) {
164                            simulation_result = simulate_task(&client, pubkey).await?;
165                        }
166
167                        json_tasks.push(Task {
168                            pubkey,
169                            id: task.id,
170                            description: task.description,
171                            trigger: Trigger::from(task.trigger),
172                            crank_reward: task.crank_reward,
173                            rent_refund: task.rent_refund,
174                            simulation_result,
175                            transaction: if self.verbose {
176                                Some(TransactionSource::from(task.transaction.clone()))
177                            } else {
178                                None
179                            },
180                        });
181                    }
182                }
183                print_json(&json_tasks)?;
184            }
185            Cmd::Close {
186                task_queue,
187                id: index,
188                description,
189                failed,
190            } => {
191                if index.is_none() && description.is_none() {
192                    return Err(anyhow!("Either id or description must be provided"));
193                }
194                if index.is_some() && description.is_some() {
195                    return Err(anyhow!("Only one of id or description can be provided"));
196                }
197                let client = opts.client().await?;
198                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
199                let task_queue: TaskQueueV0 = client
200                    .as_ref()
201                    .anchor_account(&task_queue_pubkey)
202                    .await?
203                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
204                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
205                let tasks = if let Some(index) = index {
206                    let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
207                    let task = client
208                        .as_ref()
209                        .anchor_account::<TaskV0>(&task_key)
210                        .await?
211                        .ok_or_else(|| anyhow!("Task not found"))?;
212                    vec![(task_key, task)]
213                } else if let Some(description) = description {
214                    let tasks = client
215                        .as_ref()
216                        .anchor_accounts::<TaskV0>(&task_keys)
217                        .await?;
218                    tasks
219                        .into_iter()
220                        .filter(|(_, task)| {
221                            if let Some(task) = task {
222                                return task.description.starts_with(description);
223                            }
224                            false
225                        })
226                        .map(|(p, task)| (p, task.unwrap().clone()))
227                        .collect()
228                } else {
229                    vec![]
230                };
231
232                let mut seen_ids = HashSet::new();
233                let mut to_close = Vec::new();
234
235                // If failed flag is set, simulate each task first
236                for (pubkey, task) in &tasks {
237                    if seen_ids.insert(task.id) {
238                        if *failed {
239                            if let Some(sim_result) = simulate_task(&client, *pubkey).await? {
240                                if sim_result.error.is_some() {
241                                    to_close.push(task.clone());
242                                }
243                            }
244                        } else {
245                            to_close.push(task.clone());
246                        }
247                    }
248                }
249
250                let ixs = to_close
251                    .into_iter()
252                    .map(|task| {
253                        tuktuk::task::dequeue_ix(
254                            task_queue_pubkey,
255                            client.payer.pubkey(),
256                            task.rent_refund,
257                            task.id,
258                        )
259                        .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
260                    })
261                    .collect::<Result<Vec<_>>>()?;
262
263                let ix_groups = ixs.into_iter().map(|ix| vec![ix]).collect_vec();
264                let groups = pack_instructions_into_transactions(
265                    &ix_groups.iter().map(|ix| ix.as_slice()).collect_vec(),
266                    &client.payer,
267                    None,
268                )?;
269
270                for mut to_send in groups {
271                    // Remove compute budget ixs
272                    to_send.instructions.remove(0);
273                    to_send.instructions.remove(0);
274                    send_instructions(
275                        client.rpc_client.clone(),
276                        &client.payer,
277                        client.opts.ws_url().as_str(),
278                        &to_send.instructions,
279                        &[],
280                    )
281                    .await?;
282                }
283            }
284            Cmd::Run {
285                task_queue,
286                id,
287                skip_preflight,
288                description,
289            } => {
290                if id.is_none() && description.is_none() {
291                    return Err(anyhow!("Either id or description must be provided"));
292                }
293                if id.is_some() && description.is_some() {
294                    return Err(anyhow!("Only one of id or description can be provided"));
295                }
296                let client = opts.client().await?;
297                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
298                let task_queue: TaskQueueV0 = client
299                    .as_ref()
300                    .anchor_account(&task_queue_pubkey)
301                    .await?
302                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
303                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
304                let tasks = if let Some(id) = id {
305                    let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
306                    let task = client
307                        .as_ref()
308                        .anchor_account::<TaskV0>(&task_key)
309                        .await?
310                        .ok_or_else(|| anyhow!("Task not found"))?;
311                    vec![(task_key, task)]
312                } else if let Some(description) = description {
313                    let tasks = client
314                        .as_ref()
315                        .anchor_accounts::<TaskV0>(&task_keys)
316                        .await?;
317                    tasks
318                        .into_iter()
319                        .filter(|(_, task)| {
320                            if let Some(task) = task {
321                                return task.description.starts_with(description);
322                            }
323                            false
324                        })
325                        .map(|(p, task)| (p, task.unwrap().clone()))
326                        .collect()
327                } else {
328                    vec![]
329                };
330                for (task_key, _) in tasks {
331                    let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
332                        client.as_ref(),
333                        task_key,
334                        client.payer.pubkey(),
335                        &HashSet::new(),
336                    )
337                    .await;
338                    match run_ix_result {
339                        Ok(run_ix) => {
340                            let blockhash = client.rpc_client.get_latest_blockhash().await?;
341                            let (computed, _) = auto_compute_limit_and_price(
342                                &client.rpc_client,
343                                &run_ix.instructions,
344                                1.2,
345                                &client.payer.pubkey(),
346                                Some(blockhash),
347                                Some(run_ix.lookup_tables.clone()),
348                            )
349                            .await
350                            .unwrap();
351
352                            let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
353                            let message = VersionedMessage::V0(v0::Message::try_compile(
354                                &client.payer.pubkey(),
355                                &computed,
356                                &run_ix.lookup_tables,
357                                recent_blockhash,
358                            )?);
359                            let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
360                            let txid = client
361                                .rpc_client
362                                .send_transaction_with_config(
363                                    &tx,
364                                    solana_client::rpc_config::RpcSendTransactionConfig {
365                                        skip_preflight: *skip_preflight,
366                                        preflight_commitment: Some(CommitmentLevel::Confirmed),
367                                        ..Default::default()
368                                    },
369                                )
370                                .await?;
371
372                            println!("Tx sent: {}", txid);
373                        }
374                        Err(e) => {
375                            println!("Error running task: {}", e);
376                        }
377                    }
378                }
379            }
380        }
381        Ok(())
382    }
383}
384
385#[derive(Serialize)]
386struct Task {
387    #[serde(with = "serde_pubkey")]
388    pub pubkey: Pubkey,
389    pub id: u16,
390    pub description: String,
391    #[serde(with = "serde_pubkey")]
392    pub rent_refund: Pubkey,
393    pub trigger: Trigger,
394    pub crank_reward: u64,
395    pub simulation_result: Option<SimulationResult>,
396    pub transaction: Option<TransactionSource>,
397}
398
399#[derive(Serialize)]
400struct SimulationResult {
401    pub error: Option<String>,
402    pub logs: Option<Vec<String>>,
403    pub compute_units: Option<u64>,
404}
405
406#[derive(Serialize)]
407enum Trigger {
408    Now,
409    Timestamp(i64),
410}
411
412impl From<TriggerV0> for Trigger {
413    fn from(trigger: TriggerV0) -> Self {
414        match trigger {
415            TriggerV0::Now => Trigger::Now,
416            TriggerV0::Timestamp(ts) => Trigger::Timestamp(ts),
417        }
418    }
419}