tuktuk_cli/cmd/
task.rs

1use std::collections::HashSet;
2
3use anyhow::anyhow;
4use clap::{Args, Subcommand};
5use clock::SYSVAR_CLOCK;
6use serde::Serialize;
7use solana_client::rpc_config::RpcSimulateTransactionConfig;
8use solana_sdk::{
9    commitment_config::CommitmentLevel,
10    message::{v0, VersionedMessage},
11    pubkey::Pubkey,
12    signer::Signer,
13    transaction::VersionedTransaction,
14};
15use solana_transaction_utils::{
16    pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
17};
18use tuktuk_program::{types::TriggerV0, TaskQueueV0, TaskV0};
19use tuktuk_sdk::prelude::*;
20
21use super::{task_queue::TaskQueueArg, TransactionSource};
22use crate::{
23    client::{send_instructions, CliClient},
24    cmd::Opts,
25    result::Result,
26    serde::{print_json, serde_pubkey},
27};
28
29#[derive(Debug, Args)]
30pub struct TaskCmd {
31    #[arg(long, default_value = "false")]
32    pub verbose: bool,
33    #[command(subcommand)]
34    pub cmd: Cmd,
35}
36
37#[derive(Debug, Subcommand)]
38pub enum Cmd {
39    List {
40        #[command(flatten)]
41        task_queue: TaskQueueArg,
42        // Description prefix for the task to filter by
43        #[arg(long)]
44        description: Option<String>,
45        #[arg(long, default_value = "false")]
46        skip_simulate: bool,
47    },
48    Run {
49        #[command(flatten)]
50        task_queue: TaskQueueArg,
51        #[arg(short, long)]
52        id: Option<u16>,
53        // Description prefix to run by
54        #[arg(long)]
55        description: Option<String>,
56        #[arg(short, long, default_value = "false")]
57        skip_preflight: bool,
58    },
59    Close {
60        #[command(flatten)]
61        task_queue: TaskQueueArg,
62        #[arg(short, long)]
63        id: Option<u16>,
64        // Description prefix to close by
65        #[arg(long)]
66        description: Option<String>,
67        #[arg(
68            long,
69            default_value = "false",
70            help = "Close tasks that fail simulation"
71        )]
72        failed: bool,
73    },
74}
75
76async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
77    // Get the run instruction
78    let run_ix = tuktuk_sdk::compiled_transaction::run_ix(
79        client.as_ref(),
80        task_key,
81        client.payer.pubkey(),
82        &HashSet::new(),
83    )
84    .await?;
85
86    if let Some(run_ix) = run_ix {
87        // Create and simulate the transaction
88        let mut updated_instructions = vec![
89            solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1900000),
90        ];
91        updated_instructions.extend(run_ix.instructions.clone());
92        let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
93        let message = VersionedMessage::V0(v0::Message::try_compile(
94            &client.payer.pubkey(),
95            &updated_instructions,
96            &run_ix.lookup_tables,
97            recent_blockhash,
98        )?);
99        let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
100        let sim_result = client
101            .rpc_client
102            .simulate_transaction_with_config(
103                &tx,
104                RpcSimulateTransactionConfig {
105                    commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()),
106                    sig_verify: true,
107                    ..Default::default()
108                },
109            )
110            .await;
111
112        match sim_result {
113            Ok(simulated) => Ok(Some(SimulationResult {
114                error: simulated.value.err.map(|e| e.to_string()),
115                logs: Some(simulated.value.logs.unwrap_or_default()),
116                compute_units: simulated.value.units_consumed,
117            })),
118            Err(err) => Ok(Some(SimulationResult {
119                error: Some(err.to_string()),
120                logs: None,
121                compute_units: None,
122            })),
123        }
124    } else {
125        Ok(None)
126    }
127}
128
129impl TaskCmd {
130    pub async fn run(&self, opts: Opts) -> Result {
131        match &self.cmd {
132            Cmd::List {
133                task_queue,
134                description,
135                skip_simulate,
136            } => {
137                let client = opts.client().await?;
138                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
139
140                let task_queue: TaskQueueV0 = client
141                    .as_ref()
142                    .anchor_account(&task_queue_pubkey)
143                    .await?
144                    .ok_or_else(|| anyhow!("Topic account not found"))?;
145                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
146                let tasks = client
147                    .as_ref()
148                    .anchor_accounts::<TaskV0>(&task_keys)
149                    .await?;
150                let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
151                    if let Some(task) = task {
152                        if let Some(description) = description {
153                            return task.description.starts_with(description);
154                        }
155                    }
156                    true
157                });
158
159                let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
160                let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
161                let now = clock.unix_timestamp;
162
163                let mut json_tasks = Vec::new();
164                for (pubkey, maybe_task) in filtered_tasks {
165                    if let Some(task) = maybe_task {
166                        let mut simulation_result = None;
167                        if !*skip_simulate && task.trigger.is_active(now) {
168                            simulation_result = simulate_task(&client, pubkey).await?;
169                        }
170
171                        json_tasks.push(Task {
172                            pubkey,
173                            id: task.id,
174                            description: task.description,
175                            trigger: Trigger::from(task.trigger),
176                            crank_reward: task.crank_reward,
177                            rent_refund: task.rent_refund,
178                            simulation_result,
179                            transaction: if self.verbose {
180                                Some(TransactionSource::from(task.transaction.clone()))
181                            } else {
182                                None
183                            },
184                        });
185                    }
186                }
187                print_json(&json_tasks)?;
188            }
189            Cmd::Close {
190                task_queue,
191                id: index,
192                description,
193                failed,
194            } => {
195                if index.is_none() && description.is_none() {
196                    return Err(anyhow!("Either id or description must be provided"));
197                }
198                if index.is_some() && description.is_some() {
199                    return Err(anyhow!("Only one of id or description can be provided"));
200                }
201                let client = opts.client().await?;
202                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
203                let task_queue: TaskQueueV0 = client
204                    .as_ref()
205                    .anchor_account(&task_queue_pubkey)
206                    .await?
207                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
208                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
209                let tasks = if let Some(index) = index {
210                    let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
211                    let task = client
212                        .as_ref()
213                        .anchor_account::<TaskV0>(&task_key)
214                        .await?
215                        .ok_or_else(|| anyhow!("Task not found"))?;
216                    vec![(task_key, task)]
217                } else if let Some(description) = description {
218                    let tasks = client
219                        .as_ref()
220                        .anchor_accounts::<TaskV0>(&task_keys)
221                        .await?;
222                    tasks
223                        .into_iter()
224                        .filter(|(_, task)| {
225                            if let Some(task) = task {
226                                return task.description.starts_with(description);
227                            }
228                            false
229                        })
230                        .map(|(p, task)| (p, task.unwrap().clone()))
231                        .collect()
232                } else {
233                    vec![]
234                };
235
236                let mut seen_ids = HashSet::new();
237                let mut to_close = Vec::new();
238
239                // If failed flag is set, simulate each task first
240                for (pubkey, task) in &tasks {
241                    if seen_ids.insert(task.id) {
242                        if *failed {
243                            if let Some(sim_result) = simulate_task(&client, *pubkey).await? {
244                                if sim_result.error.is_some() {
245                                    to_close.push(task.clone());
246                                }
247                            }
248                        } else {
249                            to_close.push(task.clone());
250                        }
251                    }
252                }
253
254                let ixs = to_close
255                    .into_iter()
256                    .map(|task| {
257                        tuktuk::task::dequeue_ix(
258                            task_queue_pubkey,
259                            client.payer.pubkey(),
260                            task.rent_refund,
261                            task.id,
262                        )
263                        .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
264                    })
265                    .collect::<Result<Vec<_>>>()?;
266
267                let groups = pack_instructions_into_transactions(
268                    ixs.into_iter().map(|ix| vec![ix]).collect(),
269                    &client.payer,
270                    None,
271                )?;
272
273                for mut to_send in groups {
274                    // Remove compute budget ixs
275                    to_send.instructions.remove(0);
276                    to_send.instructions.remove(0);
277                    send_instructions(
278                        client.rpc_client.clone(),
279                        &client.payer,
280                        client.opts.ws_url().as_str(),
281                        to_send.instructions,
282                        &[],
283                    )
284                    .await?;
285                }
286            }
287            Cmd::Run {
288                task_queue,
289                id,
290                skip_preflight,
291                description,
292            } => {
293                if id.is_none() && description.is_none() {
294                    return Err(anyhow!("Either id or description must be provided"));
295                }
296                if id.is_some() && description.is_some() {
297                    return Err(anyhow!("Only one of id or description can be provided"));
298                }
299                let client = opts.client().await?;
300                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
301                let task_queue: TaskQueueV0 = client
302                    .as_ref()
303                    .anchor_account(&task_queue_pubkey)
304                    .await?
305                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
306                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
307                let tasks = if let Some(id) = id {
308                    let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
309                    let task = client
310                        .as_ref()
311                        .anchor_account::<TaskV0>(&task_key)
312                        .await?
313                        .ok_or_else(|| anyhow!("Task not found"))?;
314                    vec![(task_key, task)]
315                } else if let Some(description) = description {
316                    let tasks = client
317                        .as_ref()
318                        .anchor_accounts::<TaskV0>(&task_keys)
319                        .await?;
320                    tasks
321                        .into_iter()
322                        .filter(|(_, task)| {
323                            if let Some(task) = task {
324                                return task.description.starts_with(description);
325                            }
326                            false
327                        })
328                        .map(|(p, task)| (p, task.unwrap().clone()))
329                        .collect()
330                } else {
331                    vec![]
332                };
333                for (task_key, _) in tasks {
334                    let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
335                        client.as_ref(),
336                        task_key,
337                        client.payer.pubkey(),
338                        &HashSet::new(),
339                    )
340                    .await;
341                    match run_ix_result {
342                        Ok(Some(run_ix)) => {
343                            let blockhash = client.rpc_client.get_latest_blockhash().await?;
344                            let (computed, _) = auto_compute_limit_and_price(
345                                &client.rpc_client,
346                                run_ix.instructions,
347                                1.2,
348                                &client.payer.pubkey(),
349                                Some(blockhash),
350                                Some(run_ix.lookup_tables.clone()),
351                            )
352                            .await
353                            .unwrap();
354
355                            let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
356                            let message = VersionedMessage::V0(v0::Message::try_compile(
357                                &client.payer.pubkey(),
358                                &computed,
359                                &run_ix.lookup_tables,
360                                recent_blockhash,
361                            )?);
362                            let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
363                            let txid = client
364                                .rpc_client
365                                .send_transaction_with_config(
366                                    &tx,
367                                    solana_client::rpc_config::RpcSendTransactionConfig {
368                                        skip_preflight: *skip_preflight,
369                                        preflight_commitment: Some(CommitmentLevel::Confirmed),
370                                        ..Default::default()
371                                    },
372                                )
373                                .await?;
374
375                            println!("Tx sent: {}", txid);
376                        }
377                        Err(e) => {
378                            println!("Error running task: {}", e);
379                        }
380                        _ => {}
381                    }
382                }
383            }
384        }
385        Ok(())
386    }
387}
388
389#[derive(Serialize)]
390struct Task {
391    #[serde(with = "serde_pubkey")]
392    pub pubkey: Pubkey,
393    pub id: u16,
394    pub description: String,
395    #[serde(with = "serde_pubkey")]
396    pub rent_refund: Pubkey,
397    pub trigger: Trigger,
398    pub crank_reward: u64,
399    pub simulation_result: Option<SimulationResult>,
400    pub transaction: Option<TransactionSource>,
401}
402
403#[derive(Serialize)]
404struct SimulationResult {
405    pub error: Option<String>,
406    pub logs: Option<Vec<String>>,
407    pub compute_units: Option<u64>,
408}
409
410#[derive(Serialize)]
411enum Trigger {
412    Now,
413    Timestamp(i64),
414}
415
416impl From<TriggerV0> for Trigger {
417    fn from(trigger: TriggerV0) -> Self {
418        match trigger {
419            TriggerV0::Now => Trigger::Now,
420            TriggerV0::Timestamp(ts) => Trigger::Timestamp(ts),
421        }
422    }
423}