1use std::collections::HashSet;
2
3use anyhow::anyhow;
4use clap::{Args, Subcommand};
5use clock::SYSVAR_CLOCK;
6use itertools::Itertools;
7use serde::Serialize;
8use solana_client::rpc_config::RpcSimulateTransactionConfig;
9use solana_sdk::{
10 commitment_config::CommitmentLevel,
11 message::{v0, VersionedMessage},
12 pubkey::Pubkey,
13 signer::Signer,
14 transaction::VersionedTransaction,
15};
16use solana_transaction_utils::{
17 pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
18};
19use tuktuk_program::{types::TriggerV0, TaskQueueV0, TaskV0};
20use tuktuk_sdk::prelude::*;
21
22use super::{task_queue::TaskQueueArg, TransactionSource};
23use crate::{
24 client::{send_instructions, CliClient},
25 cmd::Opts,
26 result::Result,
27 serde::{print_json, serde_pubkey},
28};
29
30#[derive(Debug, Args)]
31pub struct TaskCmd {
32 #[arg(long, default_value = "false")]
33 pub verbose: bool,
34 #[command(subcommand)]
35 pub cmd: Cmd,
36}
37
38#[derive(Debug, Subcommand)]
39pub enum Cmd {
40 List {
41 #[command(flatten)]
42 task_queue: TaskQueueArg,
43 #[arg(long)]
45 description: Option<String>,
46 #[arg(long, default_value = "false")]
47 skip_simulate: bool,
48 },
49 Run {
50 #[command(flatten)]
51 task_queue: TaskQueueArg,
52 #[arg(short, long)]
53 id: Option<u16>,
54 #[arg(long)]
56 description: Option<String>,
57 #[arg(short, long, default_value = "false")]
58 skip_preflight: bool,
59 },
60 Close {
61 #[command(flatten)]
62 task_queue: TaskQueueArg,
63 #[arg(short, long)]
64 id: Option<u16>,
65 #[arg(long)]
67 description: Option<String>,
68 #[arg(
69 long,
70 default_value = "false",
71 help = "Close tasks that fail simulation"
72 )]
73 failed: bool,
74 },
75}
76
77async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
78 let run_ix = tuktuk_sdk::compiled_transaction::run_ix(
80 client.as_ref(),
81 task_key,
82 client.payer.pubkey(),
83 &HashSet::new(),
84 )
85 .await?;
86
87 let mut updated_instructions =
89 vec![solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1900000)];
90 updated_instructions.extend(run_ix.instructions.clone());
91 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
92 let message = VersionedMessage::V0(v0::Message::try_compile(
93 &client.payer.pubkey(),
94 &updated_instructions,
95 &run_ix.lookup_tables,
96 recent_blockhash,
97 )?);
98 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
99 let sim_result = client
100 .rpc_client
101 .simulate_transaction_with_config(
102 &tx,
103 RpcSimulateTransactionConfig {
104 commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()),
105 sig_verify: true,
106 ..Default::default()
107 },
108 )
109 .await;
110
111 match sim_result {
112 Ok(simulated) => Ok(Some(SimulationResult {
113 error: simulated.value.err.map(|e| e.to_string()),
114 logs: Some(simulated.value.logs.unwrap_or_default()),
115 compute_units: simulated.value.units_consumed,
116 })),
117 Err(err) => Ok(Some(SimulationResult {
118 error: Some(err.to_string()),
119 logs: None,
120 compute_units: None,
121 })),
122 }
123}
124
125impl TaskCmd {
126 pub async fn run(&self, opts: Opts) -> Result {
127 match &self.cmd {
128 Cmd::List {
129 task_queue,
130 description,
131 skip_simulate,
132 } => {
133 let client = opts.client().await?;
134 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
135
136 let task_queue: TaskQueueV0 = client
137 .as_ref()
138 .anchor_account(&task_queue_pubkey)
139 .await?
140 .ok_or_else(|| anyhow!("Topic account not found"))?;
141 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
142 let tasks = client
143 .as_ref()
144 .anchor_accounts::<TaskV0>(&task_keys)
145 .await?;
146 let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
147 if let Some(task) = task {
148 if let Some(description) = description {
149 return task.description.starts_with(description);
150 }
151 }
152 true
153 });
154
155 let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
156 let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
157 let now = clock.unix_timestamp;
158
159 let mut json_tasks = Vec::new();
160 for (pubkey, maybe_task) in filtered_tasks {
161 if let Some(task) = maybe_task {
162 let mut simulation_result = None;
163 if !*skip_simulate && task.trigger.is_active(now) {
164 simulation_result = simulate_task(&client, pubkey).await?;
165 }
166
167 json_tasks.push(Task {
168 pubkey,
169 id: task.id,
170 description: task.description,
171 trigger: Trigger::from(task.trigger),
172 crank_reward: task.crank_reward,
173 rent_refund: task.rent_refund,
174 simulation_result,
175 transaction: if self.verbose {
176 Some(TransactionSource::from(task.transaction.clone()))
177 } else {
178 None
179 },
180 });
181 }
182 }
183 print_json(&json_tasks)?;
184 }
185 Cmd::Close {
186 task_queue,
187 id: index,
188 description,
189 failed,
190 } => {
191 if index.is_none() && description.is_none() {
192 return Err(anyhow!("Either id or description must be provided"));
193 }
194 if index.is_some() && description.is_some() {
195 return Err(anyhow!("Only one of id or description can be provided"));
196 }
197 let client = opts.client().await?;
198 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
199 let task_queue: TaskQueueV0 = client
200 .as_ref()
201 .anchor_account(&task_queue_pubkey)
202 .await?
203 .ok_or_else(|| anyhow!("Task queue account not found"))?;
204 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
205 let tasks = if let Some(index) = index {
206 let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
207 let task = client
208 .as_ref()
209 .anchor_account::<TaskV0>(&task_key)
210 .await?
211 .ok_or_else(|| anyhow!("Task not found"))?;
212 vec![(task_key, task)]
213 } else if let Some(description) = description {
214 let tasks = client
215 .as_ref()
216 .anchor_accounts::<TaskV0>(&task_keys)
217 .await?;
218 tasks
219 .into_iter()
220 .filter(|(_, task)| {
221 if let Some(task) = task {
222 return task.description.starts_with(description);
223 }
224 false
225 })
226 .map(|(p, task)| (p, task.unwrap().clone()))
227 .collect()
228 } else {
229 vec![]
230 };
231
232 let mut seen_ids = HashSet::new();
233 let mut to_close = Vec::new();
234
235 for (pubkey, task) in &tasks {
237 if seen_ids.insert(task.id) {
238 if *failed {
239 if let Some(sim_result) = simulate_task(&client, *pubkey).await? {
240 if sim_result.error.is_some() {
241 to_close.push(task.clone());
242 }
243 }
244 } else {
245 to_close.push(task.clone());
246 }
247 }
248 }
249
250 let ixs = to_close
251 .into_iter()
252 .map(|task| {
253 tuktuk::task::dequeue_ix(
254 task_queue_pubkey,
255 client.payer.pubkey(),
256 task.rent_refund,
257 task.id,
258 )
259 .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
260 })
261 .collect::<Result<Vec<_>>>()?;
262
263 let ix_groups = ixs.into_iter().map(|ix| vec![ix]).collect_vec();
264 let groups = pack_instructions_into_transactions(
265 &ix_groups.iter().map(|ix| ix.as_slice()).collect_vec(),
266 &client.payer,
267 None,
268 )?;
269
270 for mut to_send in groups {
271 to_send.instructions.remove(0);
273 to_send.instructions.remove(0);
274 send_instructions(
275 client.rpc_client.clone(),
276 &client.payer,
277 client.opts.ws_url().as_str(),
278 &to_send.instructions,
279 &[],
280 )
281 .await?;
282 }
283 }
284 Cmd::Run {
285 task_queue,
286 id,
287 skip_preflight,
288 description,
289 } => {
290 if id.is_none() && description.is_none() {
291 return Err(anyhow!("Either id or description must be provided"));
292 }
293 if id.is_some() && description.is_some() {
294 return Err(anyhow!("Only one of id or description can be provided"));
295 }
296 let client = opts.client().await?;
297 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
298 let task_queue: TaskQueueV0 = client
299 .as_ref()
300 .anchor_account(&task_queue_pubkey)
301 .await?
302 .ok_or_else(|| anyhow!("Task queue account not found"))?;
303 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
304 let tasks = if let Some(id) = id {
305 let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
306 let task = client
307 .as_ref()
308 .anchor_account::<TaskV0>(&task_key)
309 .await?
310 .ok_or_else(|| anyhow!("Task not found"))?;
311 vec![(task_key, task)]
312 } else if let Some(description) = description {
313 let tasks = client
314 .as_ref()
315 .anchor_accounts::<TaskV0>(&task_keys)
316 .await?;
317 tasks
318 .into_iter()
319 .filter(|(_, task)| {
320 if let Some(task) = task {
321 return task.description.starts_with(description);
322 }
323 false
324 })
325 .map(|(p, task)| (p, task.unwrap().clone()))
326 .collect()
327 } else {
328 vec![]
329 };
330 for (task_key, _) in tasks {
331 let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
332 client.as_ref(),
333 task_key,
334 client.payer.pubkey(),
335 &HashSet::new(),
336 )
337 .await;
338 match run_ix_result {
339 Ok(run_ix) => {
340 let blockhash = client.rpc_client.get_latest_blockhash().await?;
341 let (computed, _) = auto_compute_limit_and_price(
342 &client.rpc_client,
343 &run_ix.instructions,
344 1.2,
345 &client.payer.pubkey(),
346 Some(blockhash),
347 Some(run_ix.lookup_tables.clone()),
348 )
349 .await
350 .unwrap();
351
352 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
353 let message = VersionedMessage::V0(v0::Message::try_compile(
354 &client.payer.pubkey(),
355 &computed,
356 &run_ix.lookup_tables,
357 recent_blockhash,
358 )?);
359 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
360 let txid = client
361 .rpc_client
362 .send_transaction_with_config(
363 &tx,
364 solana_client::rpc_config::RpcSendTransactionConfig {
365 skip_preflight: *skip_preflight,
366 preflight_commitment: Some(CommitmentLevel::Confirmed),
367 ..Default::default()
368 },
369 )
370 .await?;
371
372 println!("Tx sent: {}", txid);
373 }
374 Err(e) => {
375 println!("Error running task: {}", e);
376 }
377 }
378 }
379 }
380 }
381 Ok(())
382 }
383}
384
385#[derive(Serialize)]
386struct Task {
387 #[serde(with = "serde_pubkey")]
388 pub pubkey: Pubkey,
389 pub id: u16,
390 pub description: String,
391 #[serde(with = "serde_pubkey")]
392 pub rent_refund: Pubkey,
393 pub trigger: Trigger,
394 pub crank_reward: u64,
395 pub simulation_result: Option<SimulationResult>,
396 pub transaction: Option<TransactionSource>,
397}
398
399#[derive(Serialize)]
400struct SimulationResult {
401 pub error: Option<String>,
402 pub logs: Option<Vec<String>>,
403 pub compute_units: Option<u64>,
404}
405
406#[derive(Serialize)]
407enum Trigger {
408 Now,
409 Timestamp(i64),
410}
411
412impl From<TriggerV0> for Trigger {
413 fn from(trigger: TriggerV0) -> Self {
414 match trigger {
415 TriggerV0::Now => Trigger::Now,
416 TriggerV0::Timestamp(ts) => Trigger::Timestamp(ts),
417 }
418 }
419}