1use std::collections::HashSet;
2
3use anyhow::anyhow;
4use clap::{Args, Subcommand};
5use clock::SYSVAR_CLOCK;
6use itertools::Itertools;
7use serde::Serialize;
8use solana_client::rpc_config::RpcSimulateTransactionConfig;
9use solana_sdk::{
10 commitment_config::CommitmentLevel,
11 message::{v0, VersionedMessage},
12 pubkey::Pubkey,
13 signer::Signer,
14 transaction::VersionedTransaction,
15};
16use solana_transaction_utils::{
17 pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
18};
19use tuktuk_program::{types::TriggerV0, TaskQueueV0, TaskV0};
20use tuktuk_sdk::prelude::*;
21
22use super::{task_queue::TaskQueueArg, TransactionSource};
23use crate::{
24 client::{send_instructions, CliClient},
25 cmd::Opts,
26 result::Result,
27 serde::{print_json, serde_pubkey},
28};
29
30#[derive(Debug, Args)]
31pub struct TaskCmd {
32 #[arg(long, default_value = "false")]
33 pub verbose: bool,
34 #[command(subcommand)]
35 pub cmd: Cmd,
36}
37
38#[derive(Debug, Subcommand)]
39pub enum Cmd {
40 List {
41 #[command(flatten)]
42 task_queue: TaskQueueArg,
43 #[arg(long)]
45 description: Option<String>,
46 #[arg(long, default_value = "false")]
47 skip_simulate: bool,
48 },
49 Run {
50 #[command(flatten)]
51 task_queue: TaskQueueArg,
52 #[arg(short, long)]
53 id: Option<u16>,
54 #[arg(long)]
56 description: Option<String>,
57 #[arg(short, long, default_value = "false")]
58 skip_preflight: bool,
59 },
60 Close {
61 #[command(flatten)]
62 task_queue: TaskQueueArg,
63 #[arg(short, long)]
64 id: Option<u16>,
65 #[arg(long)]
67 description: Option<String>,
68 #[arg(
69 long,
70 default_value = "false",
71 help = "Close tasks that fail simulation"
72 )]
73 failed: bool,
74 },
75}
76
77async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
78 let run_ix = tuktuk_sdk::compiled_transaction::run_ix(
80 client.as_ref(),
81 task_key,
82 client.payer.pubkey(),
83 &HashSet::new(),
84 )
85 .await?;
86
87 if let Some(run_ix) = run_ix {
88 let mut updated_instructions = vec![
90 solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1900000),
91 ];
92 updated_instructions.extend(run_ix.instructions.clone());
93 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
94 let message = VersionedMessage::V0(v0::Message::try_compile(
95 &client.payer.pubkey(),
96 &updated_instructions,
97 &run_ix.lookup_tables,
98 recent_blockhash,
99 )?);
100 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
101 let sim_result = client
102 .rpc_client
103 .simulate_transaction_with_config(
104 &tx,
105 RpcSimulateTransactionConfig {
106 commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()),
107 sig_verify: true,
108 ..Default::default()
109 },
110 )
111 .await;
112
113 match sim_result {
114 Ok(simulated) => Ok(Some(SimulationResult {
115 error: simulated.value.err.map(|e| e.to_string()),
116 logs: Some(simulated.value.logs.unwrap_or_default()),
117 compute_units: simulated.value.units_consumed,
118 })),
119 Err(err) => Ok(Some(SimulationResult {
120 error: Some(err.to_string()),
121 logs: None,
122 compute_units: None,
123 })),
124 }
125 } else {
126 Ok(None)
127 }
128}
129
130impl TaskCmd {
131 pub async fn run(&self, opts: Opts) -> Result {
132 match &self.cmd {
133 Cmd::List {
134 task_queue,
135 description,
136 skip_simulate,
137 } => {
138 let client = opts.client().await?;
139 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
140
141 let task_queue: TaskQueueV0 = client
142 .as_ref()
143 .anchor_account(&task_queue_pubkey)
144 .await?
145 .ok_or_else(|| anyhow!("Topic account not found"))?;
146 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
147 let tasks = client
148 .as_ref()
149 .anchor_accounts::<TaskV0>(&task_keys)
150 .await?;
151 let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
152 if let Some(task) = task {
153 if let Some(description) = description {
154 return task.description.starts_with(description);
155 }
156 }
157 true
158 });
159
160 let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
161 let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
162 let now = clock.unix_timestamp;
163
164 let mut json_tasks = Vec::new();
165 for (pubkey, maybe_task) in filtered_tasks {
166 if let Some(task) = maybe_task {
167 let mut simulation_result = None;
168 if !*skip_simulate && task.trigger.is_active(now) {
169 simulation_result = simulate_task(&client, pubkey).await?;
170 }
171
172 json_tasks.push(Task {
173 pubkey,
174 id: task.id,
175 description: task.description,
176 trigger: Trigger::from(task.trigger),
177 crank_reward: task.crank_reward,
178 rent_refund: task.rent_refund,
179 simulation_result,
180 transaction: if self.verbose {
181 Some(TransactionSource::from(task.transaction.clone()))
182 } else {
183 None
184 },
185 });
186 }
187 }
188 print_json(&json_tasks)?;
189 }
190 Cmd::Close {
191 task_queue,
192 id: index,
193 description,
194 failed,
195 } => {
196 if index.is_none() && description.is_none() {
197 return Err(anyhow!("Either id or description must be provided"));
198 }
199 if index.is_some() && description.is_some() {
200 return Err(anyhow!("Only one of id or description can be provided"));
201 }
202 let client = opts.client().await?;
203 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
204 let task_queue: TaskQueueV0 = client
205 .as_ref()
206 .anchor_account(&task_queue_pubkey)
207 .await?
208 .ok_or_else(|| anyhow!("Task queue account not found"))?;
209 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
210 let tasks = if let Some(index) = index {
211 let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
212 let task = client
213 .as_ref()
214 .anchor_account::<TaskV0>(&task_key)
215 .await?
216 .ok_or_else(|| anyhow!("Task not found"))?;
217 vec![(task_key, task)]
218 } else if let Some(description) = description {
219 let tasks = client
220 .as_ref()
221 .anchor_accounts::<TaskV0>(&task_keys)
222 .await?;
223 tasks
224 .into_iter()
225 .filter(|(_, task)| {
226 if let Some(task) = task {
227 return task.description.starts_with(description);
228 }
229 false
230 })
231 .map(|(p, task)| (p, task.unwrap().clone()))
232 .collect()
233 } else {
234 vec![]
235 };
236
237 let mut seen_ids = HashSet::new();
238 let mut to_close = Vec::new();
239
240 for (pubkey, task) in &tasks {
242 if seen_ids.insert(task.id) {
243 if *failed {
244 if let Some(sim_result) = simulate_task(&client, *pubkey).await? {
245 if sim_result.error.is_some() {
246 to_close.push(task.clone());
247 }
248 }
249 } else {
250 to_close.push(task.clone());
251 }
252 }
253 }
254
255 let ixs = to_close
256 .into_iter()
257 .map(|task| {
258 tuktuk::task::dequeue_ix(
259 task_queue_pubkey,
260 client.payer.pubkey(),
261 task.rent_refund,
262 task.id,
263 )
264 .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
265 })
266 .collect::<Result<Vec<_>>>()?;
267
268 let ix_groups = ixs.into_iter().map(|ix| vec![ix]).collect_vec();
269 let groups = pack_instructions_into_transactions(
270 &ix_groups.iter().map(|ix| ix.as_slice()).collect_vec(),
271 &client.payer,
272 None,
273 )?;
274
275 for mut to_send in groups {
276 to_send.instructions.remove(0);
278 to_send.instructions.remove(0);
279 send_instructions(
280 client.rpc_client.clone(),
281 &client.payer,
282 client.opts.ws_url().as_str(),
283 &to_send.instructions,
284 &[],
285 )
286 .await?;
287 }
288 }
289 Cmd::Run {
290 task_queue,
291 id,
292 skip_preflight,
293 description,
294 } => {
295 if id.is_none() && description.is_none() {
296 return Err(anyhow!("Either id or description must be provided"));
297 }
298 if id.is_some() && description.is_some() {
299 return Err(anyhow!("Only one of id or description can be provided"));
300 }
301 let client = opts.client().await?;
302 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
303 let task_queue: TaskQueueV0 = client
304 .as_ref()
305 .anchor_account(&task_queue_pubkey)
306 .await?
307 .ok_or_else(|| anyhow!("Task queue account not found"))?;
308 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
309 let tasks = if let Some(id) = id {
310 let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
311 let task = client
312 .as_ref()
313 .anchor_account::<TaskV0>(&task_key)
314 .await?
315 .ok_or_else(|| anyhow!("Task not found"))?;
316 vec![(task_key, task)]
317 } else if let Some(description) = description {
318 let tasks = client
319 .as_ref()
320 .anchor_accounts::<TaskV0>(&task_keys)
321 .await?;
322 tasks
323 .into_iter()
324 .filter(|(_, task)| {
325 if let Some(task) = task {
326 return task.description.starts_with(description);
327 }
328 false
329 })
330 .map(|(p, task)| (p, task.unwrap().clone()))
331 .collect()
332 } else {
333 vec![]
334 };
335 for (task_key, _) in tasks {
336 let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
337 client.as_ref(),
338 task_key,
339 client.payer.pubkey(),
340 &HashSet::new(),
341 )
342 .await;
343 match run_ix_result {
344 Ok(Some(run_ix)) => {
345 let blockhash = client.rpc_client.get_latest_blockhash().await?;
346 let (computed, _) = auto_compute_limit_and_price(
347 &client.rpc_client,
348 &run_ix.instructions,
349 1.2,
350 &client.payer.pubkey(),
351 Some(blockhash),
352 Some(run_ix.lookup_tables.clone()),
353 )
354 .await
355 .unwrap();
356
357 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
358 let message = VersionedMessage::V0(v0::Message::try_compile(
359 &client.payer.pubkey(),
360 &computed,
361 &run_ix.lookup_tables,
362 recent_blockhash,
363 )?);
364 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
365 let txid = client
366 .rpc_client
367 .send_transaction_with_config(
368 &tx,
369 solana_client::rpc_config::RpcSendTransactionConfig {
370 skip_preflight: *skip_preflight,
371 preflight_commitment: Some(CommitmentLevel::Confirmed),
372 ..Default::default()
373 },
374 )
375 .await?;
376
377 println!("Tx sent: {}", txid);
378 }
379 Err(e) => {
380 println!("Error running task: {}", e);
381 }
382 _ => {}
383 }
384 }
385 }
386 }
387 Ok(())
388 }
389}
390
391#[derive(Serialize)]
392struct Task {
393 #[serde(with = "serde_pubkey")]
394 pub pubkey: Pubkey,
395 pub id: u16,
396 pub description: String,
397 #[serde(with = "serde_pubkey")]
398 pub rent_refund: Pubkey,
399 pub trigger: Trigger,
400 pub crank_reward: u64,
401 pub simulation_result: Option<SimulationResult>,
402 pub transaction: Option<TransactionSource>,
403}
404
405#[derive(Serialize)]
406struct SimulationResult {
407 pub error: Option<String>,
408 pub logs: Option<Vec<String>>,
409 pub compute_units: Option<u64>,
410}
411
412#[derive(Serialize)]
413enum Trigger {
414 Now,
415 Timestamp(i64),
416}
417
418impl From<TriggerV0> for Trigger {
419 fn from(trigger: TriggerV0) -> Self {
420 match trigger {
421 TriggerV0::Now => Trigger::Now,
422 TriggerV0::Timestamp(ts) => Trigger::Timestamp(ts),
423 }
424 }
425}