tuktuk_cli/cmd/
task_queue.rs

1use std::str::FromStr;
2
3use clap::{Args, Subcommand};
4use serde::Serialize;
5use solana_sdk::{
6    instruction::Instruction, pubkey::Pubkey, signer::Signer, system_instruction::transfer,
7};
8use tuktuk::task_queue_name_mapping_key;
9use tuktuk_program::{TaskQueueV0, TuktukConfigV0};
10use tuktuk_sdk::prelude::*;
11
12use crate::{
13    client::{send_instructions, CliClient},
14    cmd::Opts,
15    result::Result,
16    serde::{print_json, serde_pubkey},
17};
18
19#[derive(Debug, Args)]
20pub struct TaskQueueCmd {
21    #[command(subcommand)]
22    pub cmd: Cmd,
23}
24
25#[derive(Debug, Subcommand)]
26pub enum Cmd {
27    Create {
28        #[arg(long)]
29        queue_authority: Option<Pubkey>,
30        #[arg(long)]
31        update_authority: Option<Pubkey>,
32        #[arg(long)]
33        capacity: u16,
34        #[arg(long)]
35        name: String,
36        #[arg(
37            long,
38            help = "Initial funding amount in lamports. Task queue funding is only required to pay extra rent for tasks that run as a result of other tasks.",
39            default_value = "0"
40        )]
41        funding_amount: u64,
42        #[arg(long, help = "Default crank reward in lamports")]
43        min_crank_reward: u64,
44        #[arg(long, help = "Lookup tables to create")]
45        lookup_tables: Option<Vec<Pubkey>>,
46        #[arg(
47            long,
48            help = "Age before a task is considered stale and can be deleted without running the instructions. This is effectively the retention rate for debugging purposes."
49        )]
50        stale_task_age: u32,
51    },
52    Update {
53        #[command(flatten)]
54        task_queue: TaskQueueArg,
55        #[arg(long, help = "Default crank reward in lamports")]
56        min_crank_reward: Option<u64>,
57        #[arg(long, help = "Lookup tables to create")]
58        lookup_tables: Option<Vec<Pubkey>>,
59        #[arg(long)]
60        update_authority: Option<Pubkey>,
61        #[arg(long)]
62        capacity: Option<u16>,
63        #[arg(
64            long,
65            help = "Age before a task is considered stale and can be deleted without running the instructions. This is effectively the retention rate for debugging purposes."
66        )]
67        stale_task_age: Option<u32>,
68    },
69    Get {
70        #[command(flatten)]
71        task_queue: TaskQueueArg,
72    },
73    Fund {
74        #[command(flatten)]
75        task_queue: TaskQueueArg,
76        #[arg(long, help = "Amount to fund the task queue with, in lamports")]
77        amount: u64,
78    },
79    Close {
80        #[command(flatten)]
81        task_queue: TaskQueueArg,
82    },
83    AddQueueAuthority {
84        #[command(flatten)]
85        task_queue: TaskQueueArg,
86        #[arg(long, help = "Authority to add")]
87        queue_authority: Pubkey,
88    },
89    RemoveQueueAuthority {
90        #[command(flatten)]
91        task_queue: TaskQueueArg,
92        #[arg(long, help = "Authority to remove")]
93        queue_authority: Pubkey,
94    },
95}
96
97#[derive(Debug, Args)]
98pub struct TaskQueueArg {
99    #[arg(long = "task-queue-name", name = "task-queue-name")]
100    pub name: Option<String>,
101    #[arg(long = "task-queue-id", name = "task-queue-id")]
102    pub id: Option<u32>,
103    #[arg(long = "task-queue-pubkey", name = "task-queue-pubkey")]
104    pub pubkey: Option<String>,
105}
106
107impl TaskQueueArg {
108    pub async fn get_pubkey(&self, client: &CliClient) -> Result<Option<Pubkey>> {
109        let tuktuk_config_key = tuktuk::config_key();
110
111        if let Some(pubkey) = &self.pubkey {
112            // Use the provided pubkey directly
113            Ok(Some(Pubkey::from_str(pubkey)?))
114        } else if let Some(id) = self.id {
115            Ok(Some(tuktuk::task_queue::key(&tuktuk_config_key, id)))
116        } else if let Some(name) = &self.name {
117            let mapping: tuktuk_program::TaskQueueNameMappingV0 = client
118                .as_ref()
119                .anchor_account(&task_queue_name_mapping_key(&tuktuk_config_key, name))
120                .await?
121                .ok_or_else(|| anyhow::anyhow!("Task queue not found"))?;
122            Ok(Some(mapping.task_queue))
123        } else {
124            Ok(None)
125        }
126    }
127}
128
129impl TaskQueueCmd {
130    async fn fund_task_queue_ix(
131        client: &CliClient,
132        task_queue_key: &Pubkey,
133        amount: u64,
134    ) -> Result<Instruction> {
135        let ix = transfer(&client.payer.pubkey(), task_queue_key, amount);
136
137        Ok(ix)
138    }
139
140    pub async fn run(&self, opts: Opts) -> Result {
141        match &self.cmd {
142            Cmd::Create {
143                queue_authority,
144                update_authority,
145                capacity,
146                name,
147                min_crank_reward,
148                funding_amount,
149                lookup_tables,
150                stale_task_age,
151            } => {
152                let client = opts.client().await?;
153
154                let (key, ix) = tuktuk::task_queue::create(
155                    client.rpc_client.as_ref(),
156                    client.payer.pubkey(),
157                    tuktuk_program::types::InitializeTaskQueueArgsV0 {
158                        capacity: *capacity,
159                        min_crank_reward: *min_crank_reward,
160                        name: name.clone(),
161                        lookup_tables: lookup_tables.clone().unwrap_or_default(),
162                        stale_task_age: *stale_task_age,
163                    },
164                    *update_authority,
165                )
166                .await?;
167                let add_queue_authority_ix = tuktuk::task_queue::add_queue_authority_ix(
168                    client.payer.pubkey(),
169                    key,
170                    queue_authority.unwrap_or(client.payer.pubkey()),
171                    update_authority.unwrap_or(client.payer.pubkey()),
172                )?;
173                // Fund if amount specified
174                let config: TuktukConfigV0 = client
175                    .as_ref()
176                    .anchor_account(&tuktuk::config_key())
177                    .await?
178                    .ok_or_else(|| anyhow::anyhow!("Tuktuk config account not found"))?;
179                if *funding_amount < config.min_deposit {
180                    return Err(anyhow::anyhow!(
181                        "Funding amount must be greater than the minimum deposit: {}",
182                        config.min_deposit
183                    ));
184                }
185                let fund_ix = Self::fund_task_queue_ix(&client, &key, *funding_amount).await?;
186
187                send_instructions(
188                    client.rpc_client.clone(),
189                    &client.payer,
190                    client.opts.ws_url().as_str(),
191                    vec![fund_ix, ix, add_queue_authority_ix],
192                    &[],
193                )
194                .await?;
195
196                let task_queue: TaskQueueV0 = client
197                    .as_ref()
198                    .anchor_account(&key)
199                    .await?
200                    .ok_or_else(|| anyhow::anyhow!("Task queue not found: {}", key))?;
201                let task_queue_balance = client.rpc_client.get_balance(&key).await?;
202
203                print_json(&TaskQueue {
204                    pubkey: key,
205                    id: task_queue.id,
206                    name: name.clone(),
207                    capacity: task_queue.capacity,
208                    update_authority: task_queue.update_authority,
209                    min_crank_reward: task_queue.min_crank_reward,
210                    balance: task_queue_balance,
211                })?;
212            }
213            Cmd::Update {
214                task_queue,
215                min_crank_reward,
216                lookup_tables,
217                update_authority,
218                capacity,
219                stale_task_age,
220            } => {
221                let client = opts.client().await?;
222                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
223                    anyhow::anyhow!(
224                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
225                    )
226                })?;
227                let ix = tuktuk::task_queue::update(
228                    client.rpc_client.as_ref(),
229                    client.payer.pubkey(),
230                    task_queue_key,
231                    tuktuk_program::types::UpdateTaskQueueArgsV0 {
232                        capacity: *capacity,
233                        min_crank_reward: *min_crank_reward,
234                        lookup_tables: lookup_tables.clone(),
235                        update_authority: *update_authority,
236                        stale_task_age: *stale_task_age,
237                    },
238                )
239                .await?;
240
241                send_instructions(
242                    client.rpc_client.clone(),
243                    &client.payer,
244                    client.opts.ws_url().as_str(),
245                    vec![ix],
246                    &[],
247                )
248                .await?;
249            }
250            Cmd::Get { task_queue } => {
251                let client = opts.client().await?;
252                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
253                    anyhow::anyhow!(
254                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
255                    )
256                })?;
257                let task_queue: TaskQueueV0 = client
258                    .rpc_client
259                    .anchor_account(&task_queue_key)
260                    .await?
261                    .ok_or_else(|| anyhow::anyhow!("Task queue not found: {}", task_queue_key))?;
262
263                let task_queue_balance = client.rpc_client.get_balance(&task_queue_key).await?;
264                let serializable = TaskQueue {
265                    pubkey: task_queue_key,
266                    id: task_queue.id,
267                    capacity: task_queue.capacity,
268                    update_authority: task_queue.update_authority,
269                    name: task_queue.name,
270                    min_crank_reward: task_queue.min_crank_reward,
271                    balance: task_queue_balance,
272                };
273                print_json(&serializable)?;
274            }
275            Cmd::Fund { task_queue, amount } => {
276                let client = opts.client().await?;
277                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
278                    anyhow::anyhow!(
279                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
280                    )
281                })?;
282
283                let fund_ix = Self::fund_task_queue_ix(&client, &task_queue_key, *amount).await?;
284                send_instructions(
285                    client.rpc_client.clone(),
286                    &client.payer,
287                    client.opts.ws_url().as_str(),
288                    vec![fund_ix],
289                    &[],
290                )
291                .await?;
292            }
293            Cmd::AddQueueAuthority {
294                task_queue,
295                queue_authority,
296            } => {
297                let client = opts.client().await?;
298                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
299                    anyhow::anyhow!(
300                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
301                    )
302                })?;
303                let ix = tuktuk::task_queue::add_queue_authority(
304                    client.rpc_client.as_ref(),
305                    client.payer.pubkey(),
306                    task_queue_key,
307                    *queue_authority,
308                )
309                .await?;
310                send_instructions(
311                    client.rpc_client.clone(),
312                    &client.payer,
313                    client.opts.ws_url().as_str(),
314                    vec![ix],
315                    &[],
316                )
317                .await?;
318            }
319            Cmd::RemoveQueueAuthority {
320                task_queue,
321                queue_authority,
322            } => {
323                let client = opts.client().await?;
324                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
325                    anyhow::anyhow!(
326                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
327                    )
328                })?;
329                let ix = tuktuk::task_queue::remove_queue_authority(
330                    client.rpc_client.as_ref(),
331                    client.payer.pubkey(),
332                    task_queue_key,
333                    *queue_authority,
334                )
335                .await?;
336                send_instructions(
337                    client.rpc_client.clone(),
338                    &client.payer,
339                    client.opts.ws_url().as_str(),
340                    vec![ix],
341                    &[],
342                )
343                .await?;
344            }
345            Cmd::Close { task_queue } => {
346                let client = opts.client().await?;
347                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
348                    anyhow::anyhow!(
349                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
350                    )
351                })?;
352
353                let ix = tuktuk::task_queue::close(
354                    client.rpc_client.as_ref(),
355                    task_queue_key,
356                    client.payer.pubkey(),
357                    client.payer.pubkey(),
358                )
359                .await?;
360                send_instructions(
361                    client.rpc_client.clone(),
362                    &client.payer,
363                    client.opts.ws_url().as_str(),
364                    vec![ix],
365                    &[],
366                )
367                .await?;
368            }
369        }
370
371        Ok(())
372    }
373}
374
375#[derive(Serialize)]
376pub struct TaskQueue {
377    #[serde(with = "serde_pubkey")]
378    pub pubkey: Pubkey,
379    pub id: u32,
380    pub capacity: u16,
381    #[serde(with = "serde_pubkey")]
382    pub update_authority: Pubkey,
383    pub name: String,
384    pub min_crank_reward: u64,
385    pub balance: u64,
386}