1use std::collections::HashSet;
2
3use anyhow::anyhow;
4use clap::{Args, Subcommand};
5use clock::SYSVAR_CLOCK;
6use serde::Serialize;
7use solana_client::rpc_config::RpcSimulateTransactionConfig;
8use solana_sdk::{
9 commitment_config::CommitmentLevel,
10 message::{v0, VersionedMessage},
11 pubkey::Pubkey,
12 signer::Signer,
13 transaction::VersionedTransaction,
14};
15use solana_transaction_utils::{
16 pack::pack_instructions_into_transactions, priority_fee::auto_compute_limit_and_price,
17};
18use tuktuk_program::{types::TriggerV0, TaskQueueV0, TaskV0};
19use tuktuk_sdk::prelude::*;
20
21use super::{task_queue::TaskQueueArg, TransactionSource};
22use crate::{
23 client::send_instructions,
24 cmd::Opts,
25 result::Result,
26 serde::{print_json, serde_pubkey},
27};
28
29#[derive(Debug, Args)]
30pub struct TaskCmd {
31 #[arg(long, default_value = "false")]
32 pub verbose: bool,
33 #[command(subcommand)]
34 pub cmd: Cmd,
35}
36
37#[derive(Debug, Subcommand)]
38pub enum Cmd {
39 List {
40 #[command(flatten)]
41 task_queue: TaskQueueArg,
42 #[arg(long)]
44 description: Option<String>,
45 },
46 Run {
47 #[command(flatten)]
48 task_queue: TaskQueueArg,
49 #[arg(short, long)]
50 id: Option<u16>,
51 #[arg(long)]
53 description: Option<String>,
54 #[arg(short, long, default_value = "false")]
55 skip_preflight: bool,
56 },
57 Close {
58 #[command(flatten)]
59 task_queue: TaskQueueArg,
60 #[arg(short, long)]
61 id: Option<u16>,
62 #[arg(long)]
64 description: Option<String>,
65 },
66}
67
68impl TaskCmd {
69 pub async fn run(&self, opts: Opts) -> Result {
70 match &self.cmd {
71 Cmd::List {
72 task_queue,
73 description,
74 } => {
75 let client = opts.client().await?;
76 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
77
78 let task_queue: TaskQueueV0 = client
79 .as_ref()
80 .anchor_account(&task_queue_pubkey)
81 .await?
82 .ok_or_else(|| anyhow!("Topic account not found"))?;
83 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
84 let tasks = client
85 .as_ref()
86 .anchor_accounts::<TaskV0>(&task_keys)
87 .await?;
88 let filtered_tasks = tasks.into_iter().filter(|(_, task)| {
89 if let Some(task) = task {
90 if let Some(description) = description {
91 return task.description.starts_with(description);
92 }
93 }
94 true
95 });
96
97 let clock_acc = client.rpc_client.get_account(&SYSVAR_CLOCK).await?;
98 let clock: solana_sdk::clock::Clock = bincode::deserialize(&clock_acc.data)?;
99 let now = clock.unix_timestamp;
100
101 let mut json_tasks = Vec::new();
102 for (pubkey, maybe_task) in filtered_tasks {
103 if let Some(task) = maybe_task {
104 let mut simulation_result = None;
105 if task.trigger.is_active(now) {
106 if let Ok(Some(run_ix)) = tuktuk_sdk::compiled_transaction::run_ix(
108 client.as_ref(),
109 pubkey,
110 client.payer.pubkey(),
111 &HashSet::new(),
112 )
113 .await
114 {
115 let mut updated_instructions = vec![
117 solana_sdk::compute_budget::ComputeBudgetInstruction::set_compute_unit_limit(
118 1900000,
119 ),
120 ];
121 updated_instructions.extend(run_ix.instructions.clone());
122 let recent_blockhash =
123 client.rpc_client.get_latest_blockhash().await?;
124 let message = VersionedMessage::V0(v0::Message::try_compile(
125 &client.payer.pubkey(),
126 &updated_instructions,
127 &run_ix.lookup_tables,
128 recent_blockhash,
129 )?);
130 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
131 let sim_result = client
132 .rpc_client
133 .simulate_transaction_with_config(
134 &tx,
135 RpcSimulateTransactionConfig {
136 commitment: Some(solana_sdk::commitment_config::CommitmentConfig::confirmed()),
137 sig_verify: true,
138 ..Default::default()
139 },
140 )
141 .await;
142
143 match sim_result {
144 Ok(simulated) => {
145 simulation_result = Some(SimulationResult {
146 error: simulated.value.err.map(|e| e.to_string()),
147 logs: Some(simulated.value.logs.unwrap_or_default()),
148 compute_units: simulated.value.units_consumed,
149 });
150 }
151 Err(err) => {
152 simulation_result = Some(SimulationResult {
153 error: Some(err.to_string()),
154 logs: None,
155 compute_units: None,
156 });
157 }
158 }
159 }
160 }
161
162 json_tasks.push(Task {
163 pubkey,
164 id: task.id,
165 description: task.description,
166 trigger: Trigger::from(task.trigger),
167 crank_reward: task.crank_reward,
168 rent_refund: task.rent_refund,
169 simulation_result,
170 transaction: if self.verbose {
171 Some(TransactionSource::from(task.transaction.clone()))
172 } else {
173 None
174 },
175 });
176 }
177 }
178 print_json(&json_tasks)?;
179 }
180 Cmd::Close {
181 task_queue,
182 id: index,
183 description,
184 } => {
185 if index.is_none() && description.is_none() {
186 return Err(anyhow!("Either id or description must be provided"));
187 }
188 if index.is_some() && description.is_some() {
189 return Err(anyhow!("Only one of id or description can be provided"));
190 }
191 let client = opts.client().await?;
192 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
193 let task_queue: TaskQueueV0 = client
194 .as_ref()
195 .anchor_account(&task_queue_pubkey)
196 .await?
197 .ok_or_else(|| anyhow!("Task queue account not found"))?;
198 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
199 let tasks = if let Some(index) = index {
200 let task_key = tuktuk::task::key(&task_queue_pubkey, *index);
201 let task = client
202 .as_ref()
203 .anchor_account::<TaskV0>(&task_key)
204 .await?
205 .ok_or_else(|| anyhow!("Task not found"))?;
206 vec![(task_key, task)]
207 } else if let Some(description) = description {
208 let tasks = client
209 .as_ref()
210 .anchor_accounts::<TaskV0>(&task_keys)
211 .await?;
212 tasks
213 .into_iter()
214 .filter(|(_, task)| {
215 if let Some(task) = task {
216 return task.description.starts_with(description);
217 }
218 false
219 })
220 .map(|(p, task)| (p, task.unwrap().clone()))
221 .collect()
222 } else {
223 vec![]
224 };
225 let mut seen_ids = HashSet::new();
226 let ixs = tasks
227 .into_iter()
228 .filter(|(_, task)| seen_ids.insert(task.id)) .map(|(_, task)| {
230 tuktuk::task::dequeue_ix(
231 task_queue_pubkey,
232 client.payer.pubkey(),
233 task.rent_refund,
234 task.id,
235 )
236 .map_err(|e| anyhow!("Failed to dequeue task: {}", e))
237 })
238 .collect::<Result<Vec<_>>>()?;
239
240 let groups = pack_instructions_into_transactions(
241 ixs.into_iter().map(|ix| vec![ix]).collect(),
242 &client.payer,
243 None,
244 )?;
245
246 for (mut to_send, _) in groups {
247 to_send.remove(0);
249 to_send.remove(0);
250 send_instructions(
251 client.rpc_client.clone(),
252 &client.payer,
253 client.opts.ws_url().as_str(),
254 to_send,
255 &[],
256 )
257 .await?;
258 }
259 }
260 Cmd::Run {
261 task_queue,
262 id,
263 skip_preflight,
264 description,
265 } => {
266 if id.is_none() && description.is_none() {
267 return Err(anyhow!("Either id or description must be provided"));
268 }
269 if id.is_some() && description.is_some() {
270 return Err(anyhow!("Only one of id or description can be provided"));
271 }
272 let client = opts.client().await?;
273 let task_queue_pubkey = task_queue.get_pubkey(&client).await?.unwrap();
274 let task_queue: TaskQueueV0 = client
275 .as_ref()
276 .anchor_account(&task_queue_pubkey)
277 .await?
278 .ok_or_else(|| anyhow!("Task queue account not found"))?;
279 let task_keys = tuktuk::task::keys(&task_queue_pubkey, &task_queue)?;
280 let tasks = if let Some(id) = id {
281 let task_key = tuktuk::task::key(&task_queue_pubkey, *id);
282 let task = client
283 .as_ref()
284 .anchor_account::<TaskV0>(&task_key)
285 .await?
286 .ok_or_else(|| anyhow!("Task not found"))?;
287 vec![(task_key, task)]
288 } else if let Some(description) = description {
289 let tasks = client
290 .as_ref()
291 .anchor_accounts::<TaskV0>(&task_keys)
292 .await?;
293 tasks
294 .into_iter()
295 .filter(|(_, task)| {
296 if let Some(task) = task {
297 return task.description.starts_with(description);
298 }
299 false
300 })
301 .map(|(p, task)| (p, task.unwrap().clone()))
302 .collect()
303 } else {
304 vec![]
305 };
306 for (task_key, _) in tasks {
307 let run_ix_result = tuktuk_sdk::compiled_transaction::run_ix(
308 client.as_ref(),
309 task_key,
310 client.payer.pubkey(),
311 &HashSet::new(),
312 )
313 .await;
314 match run_ix_result {
315 Ok(Some(run_ix)) => {
316 let blockhash = client.rpc_client.get_latest_blockhash().await?;
317 let (computed, _) = auto_compute_limit_and_price(
318 &client.rpc_client,
319 run_ix.instructions,
320 1.2,
321 &client.payer.pubkey(),
322 Some(blockhash),
323 Some(run_ix.lookup_tables.clone()),
324 )
325 .await
326 .unwrap();
327
328 let recent_blockhash = client.rpc_client.get_latest_blockhash().await?;
329 let message = VersionedMessage::V0(v0::Message::try_compile(
330 &client.payer.pubkey(),
331 &computed,
332 &run_ix.lookup_tables,
333 recent_blockhash,
334 )?);
335 let tx = VersionedTransaction::try_new(message, &[&client.payer])?;
336 let txid = client
337 .rpc_client
338 .send_transaction_with_config(
339 &tx,
340 solana_client::rpc_config::RpcSendTransactionConfig {
341 skip_preflight: *skip_preflight,
342 preflight_commitment: Some(CommitmentLevel::Confirmed),
343 ..Default::default()
344 },
345 )
346 .await?;
347
348 println!("Tx sent: {}", txid);
349 }
350 Err(e) => {
351 println!("Error running task: {}", e);
352 }
353 _ => {}
354 }
355 }
356 }
357 }
358 Ok(())
359 }
360}
361
362#[derive(Serialize)]
363struct Task {
364 #[serde(with = "serde_pubkey")]
365 pub pubkey: Pubkey,
366 pub id: u16,
367 pub description: String,
368 #[serde(with = "serde_pubkey")]
369 pub rent_refund: Pubkey,
370 pub trigger: Trigger,
371 pub crank_reward: u64,
372 pub simulation_result: Option<SimulationResult>,
373 pub transaction: Option<TransactionSource>,
374}
375
376#[derive(Serialize)]
377struct SimulationResult {
378 pub error: Option<String>,
379 pub logs: Option<Vec<String>>,
380 pub compute_units: Option<u64>,
381}
382
383#[derive(Serialize)]
384enum Trigger {
385 Now,
386 Timestamp(i64),
387}
388
389impl From<TriggerV0> for Trigger {
390 fn from(trigger: TriggerV0) -> Self {
391 match trigger {
392 TriggerV0::Now => Trigger::Now,
393 TriggerV0::Timestamp(ts) => Trigger::Timestamp(ts),
394 }
395 }
396}