tuktuk_cli/cmd/
task.rs

1use std::collections::HashSet;
2
3use anyhow::anyhow;
4use clap::{Args, Subcommand};
5use clock::SYSVAR_CLOCK;
6use serde::Serialize;
7use solana_client::rpc_config::RpcSimulateTransactionConfig;
8use solana_sdk::{
9    commitment_config::CommitmentLevel,
10    message::{v0, VersionedMessage},
11    pubkey::Pubkey,
12    signer::Signer,
13    transaction::VersionedTransaction,
14};
15use solana_transaction_utils::{
16    pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
17};
18use tuktuk_program::{types::TriggerV0, TaskQueueV0, TaskV0};
19use tuktuk_sdk::prelude::*;
20
21use super::{task_queue::TaskQueueArg, TransactionSource};
22use crate::{
23    client::send_instructions,
24    cmd::Opts,
25    result::Result,
26    serde::{print_json, serde_pubkey},
27};
28
29#[derive(Debug, Args)]
30pub struct TaskCmd {
31    #[arg(long, default_value = "false")]
32    pub verbose: bool,
33    #[command(subcommand)]
34    pub cmd: Cmd,
35}
36
37#[derive(Debug, Subcommand)]
38pub enum Cmd {
39    List {
40        #[command(flatten)]
41        task_queue: TaskQueueArg,
42        // Description prefix for the task to filter by
43        #[arg(long)]
44        description: Option<String>,
45    },
46    Run {
47        #[command(flatten)]
48        task_queue: TaskQueueArg,
49        #[arg(short, long)]
50        id: Option<u16>,
51        // Description prefix to run by
52        #[arg(long)]
53        description: Option<String>,
54        #[arg(short, long, default_value = "false")]
55        skip_preflight: bool,
56    },
57    Close {
58        #[command(flatten)]
59        task_queue: TaskQueueArg,
60        #[arg(short, long)]
61        id: Option<u16>,
62        // Description prefix to close by
63        #[arg(long)]
64        description: Option<String>,
65    },
66}
67
68impl TaskCmd {
69    pub async fn run(&self, opts: Opts) -> Result {
70        match &self.cmd {
71            Cmd::List {
72                task_queue,
73                description,
74            } => {
75                let client = opts.client().await?;
76                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
77
78                let task_queue: TaskQueueV0 = client
79                    .as_ref()
80                    .anchor_account(&task_queue_pubkey)
81                    .await?
82                    .ok_or_else(|| anyhow!("Topic account not found"))?;
83                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
84                let tasks = client
85                    .as_ref()
86                    .anchor_accounts::<TaskV0>(&task_keys)
87                    .await?;
88                let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
89                    if let Some(task) = task {
90                        if let Some(description) = description {
91                            return task.description.starts_with(description);
92                        }
93                    }
94                    true
95                });
96
97                let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
98                let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
99                let now = clock.unix_timestamp;
100
101                let mut json_tasks = Vec::new();
102                for (pubkey, maybe_task) in filtered_tasks {
103                    if let Some(task) = maybe_task {
104                        let mut simulation_result = None;
105                        if task.trigger.is_active(now) {
106                            // Get the run instruction
107                            if let Ok(Some(run_ix)) = tuktuk_sdk::compiled_transaction::run_ix(
108                                client.as_ref(),
109                                pubkey,
110                                client.payer.pubkey(),
111                                &HashSet::new(),
112                            )
113                            .await
114                            {
115                                // Create and simulate the transaction
116                                let mut updated_instructions = vec![
117                                    solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(
118                                        1900000,
119                                    ),
120                                ];
121                                updated_instructions.extend(run_ix.instructions.clone());
122                                let recent_blockhash =
123                                    client.rpc_client.get_latest_blockhash().await?;
124                                let message = VersionedMessage::V0(v0::Message::try_compile(
125                                    &client.payer.pubkey(),
126                                    &updated_instructions,
127                                    &run_ix.lookup_tables,
128                                    recent_blockhash,
129                                )?);
130                                let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
131                                let sim_result = client
132                                    .rpc_client
133                                    .simulate_transaction_with_config(
134                                        &tx,
135                                        RpcSimulateTransactionConfig {
136                                            commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()),
137                                            sig_verify: true,
138                                            ..Default::default()
139                                        },
140                                    )
141                                    .await;
142
143                                match sim_result {
144                                    Ok(simulated) => {
145                                        simulation_result = Some(SimulationResult {
146                                            error: simulated.value.err.map(|e| e.to_string()),
147                                            logs: Some(simulated.value.logs.unwrap_or_default()),
148                                            compute_units: simulated.value.units_consumed,
149                                        });
150                                    }
151                                    Err(err) => {
152                                        simulation_result = Some(SimulationResult {
153                                            error: Some(err.to_string()),
154                                            logs: None,
155                                            compute_units: None,
156                                        });
157                                    }
158                                }
159                            }
160                        }
161
162                        json_tasks.push(Task {
163                            pubkey,
164                            id: task.id,
165                            description: task.description,
166                            trigger: Trigger::from(task.trigger),
167                            crank_reward: task.crank_reward,
168                            rent_refund: task.rent_refund,
169                            simulation_result,
170                            transaction: if self.verbose {
171                                Some(TransactionSource::from(task.transaction.clone()))
172                            } else {
173                                None
174                            },
175                        });
176                    }
177                }
178                print_json(&json_tasks)?;
179            }
180            Cmd::Close {
181                task_queue,
182                id: index,
183                description,
184            } => {
185                if index.is_none() && description.is_none() {
186                    return Err(anyhow!("Either id or description must be provided"));
187                }
188                if index.is_some() && description.is_some() {
189                    return Err(anyhow!("Only one of id or description can be provided"));
190                }
191                let client = opts.client().await?;
192                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
193                let task_queue: TaskQueueV0 = client
194                    .as_ref()
195                    .anchor_account(&task_queue_pubkey)
196                    .await?
197                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
198                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
199                let tasks = if let Some(index) = index {
200                    let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
201                    let task = client
202                        .as_ref()
203                        .anchor_account::<TaskV0>(&task_key)
204                        .await?
205                        .ok_or_else(|| anyhow!("Task not found"))?;
206                    vec![(task_key, task)]
207                } else if let Some(description) = description {
208                    let tasks = client
209                        .as_ref()
210                        .anchor_accounts::<TaskV0>(&task_keys)
211                        .await?;
212                    tasks
213                        .into_iter()
214                        .filter(|(_, task)| {
215                            if let Some(task) = task {
216                                return task.description.starts_with(description);
217                            }
218                            false
219                        })
220                        .map(|(p, task)| (p, task.unwrap().clone()))
221                        .collect()
222                } else {
223                    vec![]
224                };
225                let mut seen_ids = HashSet::new();
226                let ixs = tasks
227                    .into_iter()
228                    .filter(|(_, task)| seen_ids.insert(task.id)) // Returns true if id wasn't in set
229                    .map(|(_, task)| {
230                        tuktuk::task::dequeue_ix(
231                            task_queue_pubkey,
232                            client.payer.pubkey(),
233                            task.rent_refund,
234                            task.id,
235                        )
236                        .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
237                    })
238                    .collect::<Result<Vec<_>>>()?;
239
240                let groups = pack_instructions_into_transactions(
241                    ixs.into_iter().map(|ix| vec![ix]).collect(),
242                    &client.payer,
243                    None,
244                )?;
245
246                for (mut to_send, _) in groups {
247                    // Remove compute budget ixs
248                    to_send.remove(0);
249                    to_send.remove(0);
250                    send_instructions(
251                        client.rpc_client.clone(),
252                        &client.payer,
253                        client.opts.ws_url().as_str(),
254                        to_send,
255                        &[],
256                    )
257                    .await?;
258                }
259            }
260            Cmd::Run {
261                task_queue,
262                id,
263                skip_preflight,
264                description,
265            } => {
266                if id.is_none() && description.is_none() {
267                    return Err(anyhow!("Either id or description must be provided"));
268                }
269                if id.is_some() && description.is_some() {
270                    return Err(anyhow!("Only one of id or description can be provided"));
271                }
272                let client = opts.client().await?;
273                let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
274                let task_queue: TaskQueueV0 = client
275                    .as_ref()
276                    .anchor_account(&task_queue_pubkey)
277                    .await?
278                    .ok_or_else(|| anyhow!("Task queue account not found"))?;
279                let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
280                let tasks = if let Some(id) = id {
281                    let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
282                    let task = client
283                        .as_ref()
284                        .anchor_account::<TaskV0>(&task_key)
285                        .await?
286                        .ok_or_else(|| anyhow!("Task not found"))?;
287                    vec![(task_key, task)]
288                } else if let Some(description) = description {
289                    let tasks = client
290                        .as_ref()
291                        .anchor_accounts::<TaskV0>(&task_keys)
292                        .await?;
293                    tasks
294                        .into_iter()
295                        .filter(|(_, task)| {
296                            if let Some(task) = task {
297                                return task.description.starts_with(description);
298                            }
299                            false
300                        })
301                        .map(|(p, task)| (p, task.unwrap().clone()))
302                        .collect()
303                } else {
304                    vec![]
305                };
306                for (task_key, _) in tasks {
307                    let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
308                        client.as_ref(),
309                        task_key,
310                        client.payer.pubkey(),
311                        &HashSet::new(),
312                    )
313                    .await;
314                    match run_ix_result {
315                        Ok(Some(run_ix)) => {
316                            let blockhash = client.rpc_client.get_latest_blockhash().await?;
317                            let (computed, _) = auto_compute_limit_and_price(
318                                &client.rpc_client,
319                                run_ix.instructions,
320                                1.2,
321                                &client.payer.pubkey(),
322                                Some(blockhash),
323                                Some(run_ix.lookup_tables.clone()),
324                            )
325                            .await
326                            .unwrap();
327
328                            let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
329                            let message = VersionedMessage::V0(v0::Message::try_compile(
330                                &client.payer.pubkey(),
331                                &computed,
332                                &run_ix.lookup_tables,
333                                recent_blockhash,
334                            )?);
335                            let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
336                            let txid = client
337                                .rpc_client
338                                .send_transaction_with_config(
339                                    &tx,
340                                    solana_client::rpc_config::RpcSendTransactionConfig {
341                                        skip_preflight: *skip_preflight,
342                                        preflight_commitment: Some(CommitmentLevel::Confirmed),
343                                        ..Default::default()
344                                    },
345                                )
346                                .await?;
347
348                            println!("Tx sent: {}", txid);
349                        }
350                        Err(e) => {
351                            println!("Error running task: {}", e);
352                        }
353                        _ => {}
354                    }
355                }
356            }
357        }
358        Ok(())
359    }
360}
361
362#[derive(Serialize)]
363struct Task {
364    #[serde(with = "serde_pubkey")]
365    pub pubkey: Pubkey,
366    pub id: u16,
367    pub description: String,
368    #[serde(with = "serde_pubkey")]
369    pub rent_refund: Pubkey,
370    pub trigger: Trigger,
371    pub crank_reward: u64,
372    pub simulation_result: Option<SimulationResult>,
373    pub transaction: Option<TransactionSource>,
374}
375
376#[derive(Serialize)]
377struct SimulationResult {
378    pub error: Option<String>,
379    pub logs: Option<Vec<String>>,
380    pub compute_units: Option<u64>,
381}
382
383#[derive(Serialize)]
384enum Trigger {
385    Now,
386    Timestamp(i64),
387}
388
389impl From<TriggerV0> for Trigger {
390    fn from(trigger: TriggerV0) -> Self {
391        match trigger {
392            TriggerV0::Now => Trigger::Now,
393            TriggerV0::Timestamp(ts) => Trigger::Timestamp(ts),
394        }
395    }
396}