tuktuk_cli/cmd/
task_queue.rs

1use std::str::FromStr;
2
3use clap::{Args, Subcommand};
4use serde::Serialize;
5use solana_sdk::{
6    instruction::Instruction, pubkey::Pubkey, signer::Signer, system_instruction::transfer,
7};
8use tuktuk::task_queue_name_mapping_key;
9use tuktuk_program::{TaskQueueV0, TuktukConfigV0};
10use tuktuk_sdk::prelude::*;
11
12use crate::{
13    client::{send_instructions, CliClient},
14    cmd::Opts,
15    result::Result,
16    serde::{print_json, serde_pubkey},
17};
18
19#[derive(Debug, Args)]
20pub struct TaskQueueCmd {
21    #[command(subcommand)]
22    pub cmd: Cmd,
23}
24
25#[derive(Debug, Subcommand)]
26pub enum Cmd {
27    Create {
28        #[arg(long)]
29        queue_authority: Option<Pubkey>,
30        #[arg(long)]
31        update_authority: Option<Pubkey>,
32        #[arg(long)]
33        capacity: u16,
34        #[arg(long)]
35        name: String,
36        #[arg(
37            long,
38            help = "Initial funding amount in lamports. Task queue funding is only required to pay extra rent for tasks that run as a result of other tasks.",
39            default_value = "0"
40        )]
41        funding_amount: u64,
42        #[arg(long, help = "Default crank reward in lamports")]
43        min_crank_reward: u64,
44        #[arg(long, help = "Lookup tables to create")]
45        lookup_tables: Option<Vec<Pubkey>>,
46        #[arg(
47            long,
48            help = "Age before a task is considered stale and can be deleted without running the instructions. This is effectively the retention rate for debugging purposes."
49        )]
50        stale_task_age: u32,
51    },
52    Update {
53        #[command(flatten)]
54        task_queue: TaskQueueArg,
55        #[arg(long, help = "Default crank reward in lamports")]
56        min_crank_reward: Option<u64>,
57        #[arg(long, help = "Lookup tables to create")]
58        lookup_tables: Option<Vec<Pubkey>>,
59        #[arg(long)]
60        update_authority: Option<Pubkey>,
61        #[arg(long)]
62        capacity: Option<u16>,
63        #[arg(
64            long,
65            help = "Age before a task is considered stale and can be deleted without running the instructions. This is effectively the retention rate for debugging purposes."
66        )]
67        stale_task_age: Option<u32>,
68    },
69    Get {
70        #[command(flatten)]
71        task_queue: TaskQueueArg,
72    },
73    Fund {
74        #[command(flatten)]
75        task_queue: TaskQueueArg,
76        #[arg(long, help = "Amount to fund the task queue with, in lamports")]
77        amount: u64,
78    },
79    Close {
80        #[command(flatten)]
81        task_queue: TaskQueueArg,
82    },
83    AddQueueAuthority {
84        #[command(flatten)]
85        task_queue: TaskQueueArg,
86        #[arg(long, help = "Authority to add")]
87        queue_authority: Pubkey,
88    },
89    RemoveQueueAuthority {
90        #[command(flatten)]
91        task_queue: TaskQueueArg,
92        #[arg(long, help = "Authority to remove")]
93        queue_authority: Pubkey,
94    },
95}
96
97#[derive(Debug, Args)]
98pub struct TaskQueueArg {
99    #[arg(long = "task-queue-name", name = "task-queue-name")]
100    pub name: Option<String>,
101    #[arg(long = "task-queue-id", name = "task-queue-id")]
102    pub id: Option<u32>,
103    #[arg(long = "task-queue-pubkey", name = "task-queue-pubkey")]
104    pub pubkey: Option<String>,
105}
106
107impl TaskQueueArg {
108    pub async fn get_pubkey(&self, client: &CliClient) -> Result<Option<Pubkey>> {
109        let tuktuk_config_key = tuktuk::config_key();
110
111        if let Some(pubkey) = &self.pubkey {
112            // Use the provided pubkey directly
113            Ok(Some(Pubkey::from_str(pubkey)?))
114        } else if let Some(id) = self.id {
115            Ok(Some(tuktuk::task_queue::key(&tuktuk_config_key, id)))
116        } else if let Some(name) = &self.name {
117            let mapping: tuktuk_program::TaskQueueNameMappingV0 = client
118                .as_ref()
119                .anchor_account(&task_queue_name_mapping_key(&tuktuk_config_key, name))
120                .await?
121                .ok_or_else(|| anyhow::anyhow!("Task queue not found"))?;
122            Ok(Some(mapping.task_queue))
123        } else {
124            Ok(None)
125        }
126    }
127}
128
129impl TaskQueueCmd {
130    async fn fund_task_queue_ix(
131        client: &CliClient,
132        task_queue_key: &Pubkey,
133        amount: u64,
134    ) -> Result<Instruction> {
135        let ix = transfer(&client.payer.pubkey(), task_queue_key, amount);
136
137        Ok(ix)
138    }
139
140    pub async fn run(&self, opts: Opts) -> Result {
141        match &self.cmd {
142            Cmd::Create {
143                queue_authority,
144                update_authority,
145                capacity,
146                name,
147                min_crank_reward,
148                funding_amount,
149                lookup_tables,
150                stale_task_age,
151            } => {
152                let client = opts.client().await?;
153
154                let (key, ix) = tuktuk::task_queue::create(
155                    client.rpc_client.as_ref(),
156                    client.payer.pubkey(),
157                    tuktuk_program::types::InitializeTaskQueueArgsV0 {
158                        capacity: *capacity,
159                        min_crank_reward: *min_crank_reward,
160                        name: name.clone(),
161                        lookup_tables: lookup_tables.clone().unwrap_or_default(),
162                        stale_task_age: *stale_task_age,
163                    },
164                    *update_authority,
165                )
166                .await?;
167                let add_queue_authority_ix = tuktuk::task_queue::add_queue_authority_ix(
168                    client.payer.pubkey(),
169                    key,
170                    queue_authority.unwrap_or(client.payer.pubkey()),
171                    update_authority.unwrap_or(client.payer.pubkey()),
172                )?;
173                // Fund if amount specified
174                let config: TuktukConfigV0 = client
175                    .as_ref()
176                    .anchor_account(&tuktuk::config_key())
177                    .await?
178                    .ok_or_else(|| anyhow::anyhow!("Tuktuk config account not found"))?;
179                if *funding_amount < config.min_deposit {
180                    return Err(anyhow::anyhow!(
181                        "Funding amount must be greater than the minimum deposit: {}",
182                        config.min_deposit
183                    ));
184                }
185                let fund_ix = Self::fund_task_queue_ix(&client, &key, *funding_amount).await?;
186
187                send_instructions(
188                    client.rpc_client.clone(),
189                    &client.payer,
190                    client.opts.ws_url().as_str(),
191                    vec![fund_ix, ix, add_queue_authority_ix],
192                    &[],
193                )
194                .await?;
195
196                let task_queue: TaskQueueV0 = client
197                    .as_ref()
198                    .anchor_account(&key)
199                    .await?
200                    .ok_or_else(|| anyhow::anyhow!("Task queue not found: {}", key))?;
201                let task_queue_balance = client.rpc_client.get_balance(&key).await?;
202
203                print_json(&TaskQueue {
204                    pubkey: key,
205                    id: task_queue.id,
206                    name: name.clone(),
207                    capacity: task_queue.capacity,
208                    update_authority: task_queue.update_authority,
209                    min_crank_reward: task_queue.min_crank_reward,
210                    balance: task_queue_balance,
211                    stale_task_age: *stale_task_age,
212                })?;
213            }
214            Cmd::Update {
215                task_queue,
216                min_crank_reward,
217                lookup_tables,
218                update_authority,
219                capacity,
220                stale_task_age,
221            } => {
222                let client = opts.client().await?;
223                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
224                    anyhow::anyhow!(
225                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
226                    )
227                })?;
228                let ix = tuktuk::task_queue::update(
229                    client.rpc_client.as_ref(),
230                    client.payer.pubkey(),
231                    task_queue_key,
232                    tuktuk_program::types::UpdateTaskQueueArgsV0 {
233                        capacity: *capacity,
234                        min_crank_reward: *min_crank_reward,
235                        lookup_tables: lookup_tables.clone(),
236                        update_authority: *update_authority,
237                        stale_task_age: *stale_task_age,
238                    },
239                )
240                .await?;
241
242                send_instructions(
243                    client.rpc_client.clone(),
244                    &client.payer,
245                    client.opts.ws_url().as_str(),
246                    vec![ix],
247                    &[],
248                )
249                .await?;
250            }
251            Cmd::Get { task_queue } => {
252                let client = opts.client().await?;
253                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
254                    anyhow::anyhow!(
255                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
256                    )
257                })?;
258                let task_queue: TaskQueueV0 = client
259                    .rpc_client
260                    .anchor_account(&task_queue_key)
261                    .await?
262                    .ok_or_else(|| anyhow::anyhow!("Task queue not found: {}", task_queue_key))?;
263
264                let task_queue_balance = client.rpc_client.get_balance(&task_queue_key).await?;
265                let serializable = TaskQueue {
266                    pubkey: task_queue_key,
267                    id: task_queue.id,
268                    capacity: task_queue.capacity,
269                    update_authority: task_queue.update_authority,
270                    name: task_queue.name,
271                    min_crank_reward: task_queue.min_crank_reward,
272                    balance: task_queue_balance,
273                    stale_task_age: task_queue.stale_task_age,
274                };
275                print_json(&serializable)?;
276            }
277            Cmd::Fund { task_queue, amount } => {
278                let client = opts.client().await?;
279                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
280                    anyhow::anyhow!(
281                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
282                    )
283                })?;
284
285                let fund_ix = Self::fund_task_queue_ix(&client, &task_queue_key, *amount).await?;
286                send_instructions(
287                    client.rpc_client.clone(),
288                    &client.payer,
289                    client.opts.ws_url().as_str(),
290                    vec![fund_ix],
291                    &[],
292                )
293                .await?;
294            }
295            Cmd::AddQueueAuthority {
296                task_queue,
297                queue_authority,
298            } => {
299                let client = opts.client().await?;
300                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
301                    anyhow::anyhow!(
302                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
303                    )
304                })?;
305                let ix = tuktuk::task_queue::add_queue_authority(
306                    client.rpc_client.as_ref(),
307                    client.payer.pubkey(),
308                    task_queue_key,
309                    *queue_authority,
310                )
311                .await?;
312                send_instructions(
313                    client.rpc_client.clone(),
314                    &client.payer,
315                    client.opts.ws_url().as_str(),
316                    vec![ix],
317                    &[],
318                )
319                .await?;
320            }
321            Cmd::RemoveQueueAuthority {
322                task_queue,
323                queue_authority,
324            } => {
325                let client = opts.client().await?;
326                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
327                    anyhow::anyhow!(
328                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
329                    )
330                })?;
331                let ix = tuktuk::task_queue::remove_queue_authority(
332                    client.rpc_client.as_ref(),
333                    client.payer.pubkey(),
334                    task_queue_key,
335                    *queue_authority,
336                )
337                .await?;
338                send_instructions(
339                    client.rpc_client.clone(),
340                    &client.payer,
341                    client.opts.ws_url().as_str(),
342                    vec![ix],
343                    &[],
344                )
345                .await?;
346            }
347            Cmd::Close { task_queue } => {
348                let client = opts.client().await?;
349                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
350                    anyhow::anyhow!(
351                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
352                    )
353                })?;
354
355                let ix = tuktuk::task_queue::close(
356                    client.rpc_client.as_ref(),
357                    task_queue_key,
358                    client.payer.pubkey(),
359                    client.payer.pubkey(),
360                )
361                .await?;
362                send_instructions(
363                    client.rpc_client.clone(),
364                    &client.payer,
365                    client.opts.ws_url().as_str(),
366                    vec![ix],
367                    &[],
368                )
369                .await?;
370            }
371        }
372
373        Ok(())
374    }
375}
376
377#[derive(Serialize)]
378pub struct TaskQueue {
379    #[serde(with = "serde_pubkey")]
380    pub pubkey: Pubkey,
381    pub id: u32,
382    pub capacity: u16,
383    #[serde(with = "serde_pubkey")]
384    pub update_authority: Pubkey,
385    pub name: String,
386    pub min_crank_reward: u64,
387    pub balance: u64,
388    pub stale_task_age: u32,
389}