1use std::collections::HashSet;
2
3use anyhow::anyhow;
4use clap::{Args, Subcommand};
5use clock::SYSVAR_CLOCK;
6use serde::Serialize;
7use solana_client::rpc_config::RpcSimulateTransactionConfig;
8use solana_sdk::{
9 commitment_config::CommitmentLevel,
10 message::{v0, VersionedMessage},
11 pubkey::Pubkey,
12 signer::Signer,
13 transaction::VersionedTransaction,
14};
15use solana_transaction_utils::{
16 pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
17};
18use tuktuk_program::{types::TriggerV0, TaskQueueV0, TaskV0};
19use tuktuk_sdk::prelude::*;
20
21use super::{task_queue::TaskQueueArg, TransactionSource};
22use crate::{
23 client::{send_instructions, CliClient},
24 cmd::Opts,
25 result::Result,
26 serde::{print_json, serde_pubkey},
27};
28
29#[derive(Debug, Args)]
30pub struct TaskCmd {
31 #[arg(long, default_value = "false")]
32 pub verbose: bool,
33 #[command(subcommand)]
34 pub cmd: Cmd,
35}
36
37#[derive(Debug, Subcommand)]
38pub enum Cmd {
39 List {
40 #[command(flatten)]
41 task_queue: TaskQueueArg,
42 #[arg(long)]
44 description: Option<String>,
45 #[arg(long, default_value = "false")]
46 skip_simulate: bool,
47 },
48 Run {
49 #[command(flatten)]
50 task_queue: TaskQueueArg,
51 #[arg(short, long)]
52 id: Option<u16>,
53 #[arg(long)]
55 description: Option<String>,
56 #[arg(short, long, default_value = "false")]
57 skip_preflight: bool,
58 },
59 Close {
60 #[command(flatten)]
61 task_queue: TaskQueueArg,
62 #[arg(short, long)]
63 id: Option<u16>,
64 #[arg(long)]
66 description: Option<String>,
67 #[arg(
68 long,
69 default_value = "false",
70 help = "Close tasks that fail simulation"
71 )]
72 failed: bool,
73 },
74}
75
76async fn simulate_task(client: &CliClient, task_key: Pubkey) -> Result<Option<SimulationResult>> {
77 let run_ix = tuktuk_sdk::compiled_transaction::run_ix(
79 client.as_ref(),
80 task_key,
81 client.payer.pubkey(),
82 &HashSet::new(),
83 )
84 .await?;
85
86 if let Some(run_ix) = run_ix {
87 let mut updated_instructions = vec![
89 solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(1900000),
90 ];
91 updated_instructions.extend(run_ix.instructions.clone());
92 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
93 let message = VersionedMessage::V0(v0::Message::try_compile(
94 &client.payer.pubkey(),
95 &updated_instructions,
96 &run_ix.lookup_tables,
97 recent_blockhash,
98 )?);
99 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
100 let sim_result = client
101 .rpc_client
102 .simulate_transaction_with_config(
103 &tx,
104 RpcSimulateTransactionConfig {
105 commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()),
106 sig_verify: true,
107 ..Default::default()
108 },
109 )
110 .await;
111
112 match sim_result {
113 Ok(simulated) => Ok(Some(SimulationResult {
114 error: simulated.value.err.map(|e| e.to_string()),
115 logs: Some(simulated.value.logs.unwrap_or_default()),
116 compute_units: simulated.value.units_consumed,
117 })),
118 Err(err) => Ok(Some(SimulationResult {
119 error: Some(err.to_string()),
120 logs: None,
121 compute_units: None,
122 })),
123 }
124 } else {
125 Ok(None)
126 }
127}
128
129impl TaskCmd {
130 pub async fn run(&self, opts: Opts) -> Result {
131 match &self.cmd {
132 Cmd::List {
133 task_queue,
134 description,
135 skip_simulate,
136 } => {
137 let client = opts.client().await?;
138 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
139
140 let task_queue: TaskQueueV0 = client
141 .as_ref()
142 .anchor_account(&task_queue_pubkey)
143 .await?
144 .ok_or_else(|| anyhow!("Topic account not found"))?;
145 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
146 let tasks = client
147 .as_ref()
148 .anchor_accounts::<TaskV0>(&task_keys)
149 .await?;
150 let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
151 if let Some(task) = task {
152 if let Some(description) = description {
153 return task.description.starts_with(description);
154 }
155 }
156 true
157 });
158
159 let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
160 let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
161 let now = clock.unix_timestamp;
162
163 let mut json_tasks = Vec::new();
164 for (pubkey, maybe_task) in filtered_tasks {
165 if let Some(task) = maybe_task {
166 let mut simulation_result = None;
167 if !*skip_simulate && task.trigger.is_active(now) {
168 simulation_result = simulate_task(&client, pubkey).await?;
169 }
170
171 json_tasks.push(Task {
172 pubkey,
173 id: task.id,
174 description: task.description,
175 trigger: Trigger::from(task.trigger),
176 crank_reward: task.crank_reward,
177 rent_refund: task.rent_refund,
178 simulation_result,
179 transaction: if self.verbose {
180 Some(TransactionSource::from(task.transaction.clone()))
181 } else {
182 None
183 },
184 });
185 }
186 }
187 print_json(&json_tasks)?;
188 }
189 Cmd::Close {
190 task_queue,
191 id: index,
192 description,
193 failed,
194 } => {
195 if index.is_none() && description.is_none() {
196 return Err(anyhow!("Either id or description must be provided"));
197 }
198 if index.is_some() && description.is_some() {
199 return Err(anyhow!("Only one of id or description can be provided"));
200 }
201 let client = opts.client().await?;
202 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
203 let task_queue: TaskQueueV0 = client
204 .as_ref()
205 .anchor_account(&task_queue_pubkey)
206 .await?
207 .ok_or_else(|| anyhow!("Task queue account not found"))?;
208 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
209 let tasks = if let Some(index) = index {
210 let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
211 let task = client
212 .as_ref()
213 .anchor_account::<TaskV0>(&task_key)
214 .await?
215 .ok_or_else(|| anyhow!("Task not found"))?;
216 vec![(task_key, task)]
217 } else if let Some(description) = description {
218 let tasks = client
219 .as_ref()
220 .anchor_accounts::<TaskV0>(&task_keys)
221 .await?;
222 tasks
223 .into_iter()
224 .filter(|(_, task)| {
225 if let Some(task) = task {
226 return task.description.starts_with(description);
227 }
228 false
229 })
230 .map(|(p, task)| (p, task.unwrap().clone()))
231 .collect()
232 } else {
233 vec![]
234 };
235
236 let mut seen_ids = HashSet::new();
237 let mut to_close = Vec::new();
238
239 for (pubkey, task) in &tasks {
241 if seen_ids.insert(task.id) {
242 if *failed {
243 if let Some(sim_result) = simulate_task(&client, *pubkey).await? {
244 if sim_result.error.is_some() {
245 to_close.push(task.clone());
246 }
247 }
248 } else {
249 to_close.push(task.clone());
250 }
251 }
252 }
253
254 let ixs = to_close
255 .into_iter()
256 .map(|task| {
257 tuktuk::task::dequeue_ix(
258 task_queue_pubkey,
259 client.payer.pubkey(),
260 task.rent_refund,
261 task.id,
262 )
263 .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
264 })
265 .collect::<Result<Vec<_>>>()?;
266
267 let groups = pack_instructions_into_transactions(
268 ixs.into_iter().map(|ix| vec![ix]).collect(),
269 &client.payer,
270 None,
271 )?;
272
273 for mut to_send in groups {
274 to_send.instructions.remove(0);
276 to_send.instructions.remove(0);
277 send_instructions(
278 client.rpc_client.clone(),
279 &client.payer,
280 client.opts.ws_url().as_str(),
281 to_send.instructions,
282 &[],
283 )
284 .await?;
285 }
286 }
287 Cmd::Run {
288 task_queue,
289 id,
290 skip_preflight,
291 description,
292 } => {
293 if id.is_none() && description.is_none() {
294 return Err(anyhow!("Either id or description must be provided"));
295 }
296 if id.is_some() && description.is_some() {
297 return Err(anyhow!("Only one of id or description can be provided"));
298 }
299 let client = opts.client().await?;
300 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
301 let task_queue: TaskQueueV0 = client
302 .as_ref()
303 .anchor_account(&task_queue_pubkey)
304 .await?
305 .ok_or_else(|| anyhow!("Task queue account not found"))?;
306 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
307 let tasks = if let Some(id) = id {
308 let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
309 let task = client
310 .as_ref()
311 .anchor_account::<TaskV0>(&task_key)
312 .await?
313 .ok_or_else(|| anyhow!("Task not found"))?;
314 vec![(task_key, task)]
315 } else if let Some(description) = description {
316 let tasks = client
317 .as_ref()
318 .anchor_accounts::<TaskV0>(&task_keys)
319 .await?;
320 tasks
321 .into_iter()
322 .filter(|(_, task)| {
323 if let Some(task) = task {
324 return task.description.starts_with(description);
325 }
326 false
327 })
328 .map(|(p, task)| (p, task.unwrap().clone()))
329 .collect()
330 } else {
331 vec![]
332 };
333 for (task_key, _) in tasks {
334 let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
335 client.as_ref(),
336 task_key,
337 client.payer.pubkey(),
338 &HashSet::new(),
339 )
340 .await;
341 match run_ix_result {
342 Ok(Some(run_ix)) => {
343 let blockhash = client.rpc_client.get_latest_blockhash().await?;
344 let (computed, _) = auto_compute_limit_and_price(
345 &client.rpc_client,
346 run_ix.instructions,
347 1.2,
348 &client.payer.pubkey(),
349 Some(blockhash),
350 Some(run_ix.lookup_tables.clone()),
351 )
352 .await
353 .unwrap();
354
355 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
356 let message = VersionedMessage::V0(v0::Message::try_compile(
357 &client.payer.pubkey(),
358 &computed,
359 &run_ix.lookup_tables,
360 recent_blockhash,
361 )?);
362 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
363 let txid = client
364 .rpc_client
365 .send_transaction_with_config(
366 &tx,
367 solana_client::rpc_config::RpcSendTransactionConfig {
368 skip_preflight: *skip_preflight,
369 preflight_commitment: Some(CommitmentLevel::Confirmed),
370 ..Default::default()
371 },
372 )
373 .await?;
374
375 println!("Tx sent: {}", txid);
376 }
377 Err(e) => {
378 println!("Error running task: {}", e);
379 }
380 _ => {}
381 }
382 }
383 }
384 }
385 Ok(())
386 }
387}
388
389#[derive(Serialize)]
390struct Task {
391 #[serde(with = "serde_pubkey")]
392 pub pubkey: Pubkey,
393 pub id: u16,
394 pub description: String,
395 #[serde(with = "serde_pubkey")]
396 pub rent_refund: Pubkey,
397 pub trigger: Trigger,
398 pub crank_reward: u64,
399 pub simulation_result: Option<SimulationResult>,
400 pub transaction: Option<TransactionSource>,
401}
402
403#[derive(Serialize)]
404struct SimulationResult {
405 pub error: Option<String>,
406 pub logs: Option<Vec<String>>,
407 pub compute_units: Option<u64>,
408}
409
410#[derive(Serialize)]
411enum Trigger {
412 Now,
413 Timestamp(i64),
414}
415
416impl From<TriggerV0> for Trigger {
417 fn from(trigger: TriggerV0) -> Self {
418 match trigger {
419 TriggerV0::Now => Trigger::Now,
420 TriggerV0::Timestamp(ts) => Trigger::Timestamp(ts),
421 }
422 }
423}