1use std::str::FromStr;
2
3use clap::{Args, Subcommand};
4use serde::Serialize;
5use solana_sdk::{
6 instruction::Instruction, pubkey::Pubkey, signer::Signer, system_instruction::transfer,
7};
8use tuktuk::task_queue_name_mapping_key;
9use tuktuk_program::TaskQueueV0;
10use tuktuk_sdk::prelude::*;
11
12use crate::{
13 client::{send_instructions, CliClient},
14 cmd::Opts,
15 result::Result,
16 serde::{print_json, serde_pubkey},
17};
18
19#[derive(Debug, Args)]
20pub struct TaskQueueCmd {
21 #[command(subcommand)]
22 pub cmd: Cmd,
23}
24
25#[derive(Debug, Subcommand)]
26pub enum Cmd {
27 Create {
28 #[arg(long)]
29 queue_authority: Option<Pubkey>,
30 #[arg(long)]
31 update_authority: Option<Pubkey>,
32 #[arg(long)]
33 capacity: u16,
34 #[arg(long)]
35 name: String,
36 #[arg(
37 long,
38 help = "Initial funding amount in lamports. Task queue funding is only required to pay extra rent for tasks that run as a result of other tasks.",
39 default_value = "0"
40 )]
41 funding_amount: u64,
42 #[arg(long, help = "Default crank reward in lamports")]
43 min_crank_reward: u64,
44 #[arg(long, help = "Lookup tables to create")]
45 lookup_tables: Option<Vec<Pubkey>>,
46 #[arg(
47 long,
48 help = "Age before a task is considered stale and can be deleted without running the instructions. This is effectively the retention rate for debugging purposes."
49 )]
50 stale_task_age: u32,
51 },
52 Update {
53 #[command(flatten)]
54 task_queue: TaskQueueArg,
55 #[arg(long, help = "Default crank reward in lamports")]
56 min_crank_reward: Option<u64>,
57 #[arg(long, help = "Lookup tables to create")]
58 lookup_tables: Option<Vec<Pubkey>>,
59 #[arg(long)]
60 update_authority: Option<Pubkey>,
61 #[arg(long)]
62 capacity: Option<u16>,
63 #[arg(
64 long,
65 help = "Age before a task is considered stale and can be deleted without running the instructions. This is effectively the retention rate for debugging purposes."
66 )]
67 stale_task_age: Option<u32>,
68 },
69 Get {
70 #[command(flatten)]
71 task_queue: TaskQueueArg,
72 },
73 Fund {
74 #[command(flatten)]
75 task_queue: TaskQueueArg,
76 #[arg(long, help = "Amount to fund the task queue with, in lamports")]
77 amount: u64,
78 },
79 Close {
80 #[command(flatten)]
81 task_queue: TaskQueueArg,
82 },
83 AddQueueAuthority {
84 #[command(flatten)]
85 task_queue: TaskQueueArg,
86 #[arg(long, help = "Authority to add")]
87 queue_authority: Pubkey,
88 },
89 RemoveQueueAuthority {
90 #[command(flatten)]
91 task_queue: TaskQueueArg,
92 #[arg(long, help = "Authority to remove")]
93 queue_authority: Pubkey,
94 },
95}
96
97#[derive(Debug, Args)]
98pub struct TaskQueueArg {
99 #[arg(long = "task-queue-name", name = "task-queue-name")]
100 pub name: Option<String>,
101 #[arg(long = "task-queue-id", name = "task-queue-id")]
102 pub id: Option<u32>,
103 #[arg(long = "task-queue-pubkey", name = "task-queue-pubkey")]
104 pub pubkey: Option<String>,
105}
106
107impl TaskQueueArg {
108 pub async fn get_pubkey(&self, client: &CliClient) -> Result<Option<Pubkey>> {
109 let tuktuk_config_key = tuktuk::config_key();
110
111 if let Some(pubkey) = &self.pubkey {
112 Ok(Some(Pubkey::from_str(pubkey)?))
114 } else if let Some(id) = self.id {
115 Ok(Some(tuktuk::task_queue::key(&tuktuk_config_key, id)))
116 } else if let Some(name) = &self.name {
117 let mapping: tuktuk_program::TaskQueueNameMappingV0 = client
118 .as_ref()
119 .anchor_account(&task_queue_name_mapping_key(&tuktuk_config_key, name))
120 .await?
121 .ok_or_else(|| anyhow::anyhow!("Task queue not found"))?;
122 Ok(Some(mapping.task_queue))
123 } else {
124 Ok(None)
125 }
126 }
127}
128
129impl TaskQueueCmd {
130 async fn fund_task_queue_ix(
131 client: &CliClient,
132 task_queue_key: &Pubkey,
133 amount: u64,
134 ) -> Result<Instruction> {
135 let ix = transfer(&client.payer.pubkey(), task_queue_key, amount);
136
137 Ok(ix)
138 }
139
140 pub async fn run(&self, opts: Opts) -> Result {
141 match &self.cmd {
142 Cmd::Create {
143 queue_authority,
144 update_authority,
145 capacity,
146 name,
147 min_crank_reward,
148 funding_amount,
149 lookup_tables,
150 stale_task_age,
151 } => {
152 let client = opts.client().await?;
153
154 let (key, ix) = tuktuk::task_queue::create(
155 client.rpc_client.as_ref(),
156 client.payer.pubkey(),
157 tuktuk_program::types::InitializeTaskQueueArgsV0 {
158 capacity: *capacity,
159 min_crank_reward: *min_crank_reward,
160 name: name.clone(),
161 lookup_tables: lookup_tables.clone().unwrap_or_default(),
162 stale_task_age: *stale_task_age,
163 },
164 *update_authority,
165 )
166 .await?;
167 let add_queue_authority_ix = tuktuk::task_queue::add_queue_authority_ix(
168 client.payer.pubkey(),
169 key,
170 queue_authority.unwrap_or(client.payer.pubkey()),
171 update_authority.unwrap_or(client.payer.pubkey()),
172 )?;
173 let fund_ix = Self::fund_task_queue_ix(&client, &key, *funding_amount).await?;
175
176 send_instructions(
177 client.rpc_client.clone(),
178 &client.payer,
179 client.opts.ws_url().as_str(),
180 &[fund_ix, ix, add_queue_authority_ix],
181 &[],
182 )
183 .await?;
184
185 let task_queue: TaskQueueV0 = client
186 .as_ref()
187 .anchor_account(&key)
188 .await?
189 .ok_or_else(|| anyhow::anyhow!("Task queue not found: {}", key))?;
190 let task_queue_balance = client.rpc_client.get_balance(&key).await?;
191
192 print_json(&TaskQueue {
193 pubkey: key,
194 id: task_queue.id,
195 name: name.clone(),
196 capacity: task_queue.capacity,
197 update_authority: task_queue.update_authority,
198 min_crank_reward: task_queue.min_crank_reward,
199 balance: task_queue_balance,
200 stale_task_age: *stale_task_age,
201 })?;
202 }
203 Cmd::Update {
204 task_queue,
205 min_crank_reward,
206 lookup_tables,
207 update_authority,
208 capacity,
209 stale_task_age,
210 } => {
211 let client = opts.client().await?;
212 let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
213 anyhow::anyhow!(
214 "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
215 )
216 })?;
217 let ix = tuktuk::task_queue::update(
218 client.rpc_client.as_ref(),
219 client.payer.pubkey(),
220 task_queue_key,
221 tuktuk_program::types::UpdateTaskQueueArgsV0 {
222 capacity: *capacity,
223 min_crank_reward: *min_crank_reward,
224 lookup_tables: lookup_tables.clone(),
225 update_authority: *update_authority,
226 stale_task_age: *stale_task_age,
227 },
228 )
229 .await?;
230
231 send_instructions(
232 client.rpc_client.clone(),
233 &client.payer,
234 client.opts.ws_url().as_str(),
235 &[ix],
236 &[],
237 )
238 .await?;
239 }
240 Cmd::Get { task_queue } => {
241 let client = opts.client().await?;
242 let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
243 anyhow::anyhow!(
244 "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
245 )
246 })?;
247 let task_queue: TaskQueueV0 = client
248 .rpc_client
249 .anchor_account(&task_queue_key)
250 .await?
251 .ok_or_else(|| anyhow::anyhow!("Task queue not found: {}", task_queue_key))?;
252
253 let task_queue_balance = client.rpc_client.get_balance(&task_queue_key).await?;
254 let serializable = TaskQueue {
255 pubkey: task_queue_key,
256 id: task_queue.id,
257 capacity: task_queue.capacity,
258 update_authority: task_queue.update_authority,
259 name: task_queue.name,
260 min_crank_reward: task_queue.min_crank_reward,
261 balance: task_queue_balance,
262 stale_task_age: task_queue.stale_task_age,
263 };
264 print_json(&serializable)?;
265 }
266 Cmd::Fund { task_queue, amount } => {
267 let client = opts.client().await?;
268 let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
269 anyhow::anyhow!(
270 "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
271 )
272 })?;
273
274 let fund_ix = Self::fund_task_queue_ix(&client, &task_queue_key, *amount).await?;
275 send_instructions(
276 client.rpc_client.clone(),
277 &client.payer,
278 client.opts.ws_url().as_str(),
279 &[fund_ix],
280 &[],
281 )
282 .await?;
283 }
284 Cmd::AddQueueAuthority {
285 task_queue,
286 queue_authority,
287 } => {
288 let client = opts.client().await?;
289 let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
290 anyhow::anyhow!(
291 "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
292 )
293 })?;
294 let ix = tuktuk::task_queue::add_queue_authority(
295 client.rpc_client.as_ref(),
296 client.payer.pubkey(),
297 task_queue_key,
298 *queue_authority,
299 )
300 .await?;
301 send_instructions(
302 client.rpc_client.clone(),
303 &client.payer,
304 client.opts.ws_url().as_str(),
305 &[ix],
306 &[],
307 )
308 .await?;
309 }
310 Cmd::RemoveQueueAuthority {
311 task_queue,
312 queue_authority,
313 } => {
314 let client = opts.client().await?;
315 let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
316 anyhow::anyhow!(
317 "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
318 )
319 })?;
320 let ix = tuktuk::task_queue::remove_queue_authority(
321 client.rpc_client.as_ref(),
322 client.payer.pubkey(),
323 task_queue_key,
324 *queue_authority,
325 )
326 .await?;
327 send_instructions(
328 client.rpc_client.clone(),
329 &client.payer,
330 client.opts.ws_url().as_str(),
331 &[ix],
332 &[],
333 )
334 .await?;
335 }
336 Cmd::Close { task_queue } => {
337 let client = opts.client().await?;
338 let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
339 anyhow::anyhow!(
340 "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
341 )
342 })?;
343
344 let ix = tuktuk::task_queue::close(
345 client.rpc_client.as_ref(),
346 task_queue_key,
347 client.payer.pubkey(),
348 client.payer.pubkey(),
349 )
350 .await?;
351 send_instructions(
352 client.rpc_client.clone(),
353 &client.payer,
354 client.opts.ws_url().as_str(),
355 &[ix],
356 &[],
357 )
358 .await?;
359 }
360 }
361
362 Ok(())
363 }
364}
365
366#[derive(Serialize)]
367pub struct TaskQueue {
368 #[serde(with = "serde_pubkey")]
369 pub pubkey: Pubkey,
370 pub id: u32,
371 pub capacity: u16,
372 #[serde(with = "serde_pubkey")]
373 pub update_authority: Pubkey,
374 pub name: String,
375 pub min_crank_reward: u64,
376 pub balance: u64,
377 pub stale_task_age: u32,
378}