tuktuk_cli/cmd/
task_queue.rs

1use std::str::FromStr;
2
3use clap::{Args, Subcommand};
4use serde::Serialize;
5use solana_sdk::{
6    instruction::Instruction, pubkey::Pubkey, signer::Signer, system_instruction::transfer,
7};
8use tuktuk::task_queue_name_mapping_key;
9use tuktuk_program::TaskQueueV0;
10use tuktuk_sdk::prelude::*;
11
12use crate::{
13    client::{send_instructions, CliClient},
14    cmd::Opts,
15    result::Result,
16    serde::{print_json, serde_pubkey},
17};
18
19#[derive(Debug, Args)]
20pub struct TaskQueueCmd {
21    #[command(subcommand)]
22    pub cmd: Cmd,
23}
24
25#[derive(Debug, Subcommand)]
26pub enum Cmd {
27    Create {
28        #[arg(long)]
29        queue_authority: Option<Pubkey>,
30        #[arg(long)]
31        update_authority: Option<Pubkey>,
32        #[arg(long)]
33        capacity: u16,
34        #[arg(long)]
35        name: String,
36        #[arg(
37            long,
38            help = "Initial funding amount in lamports. Task queue funding is only required to pay extra rent for tasks that run as a result of other tasks.",
39            default_value = "0"
40        )]
41        funding_amount: u64,
42        #[arg(long, help = "Default crank reward in lamports")]
43        min_crank_reward: u64,
44        #[arg(long, help = "Lookup tables to create")]
45        lookup_tables: Option<Vec<Pubkey>>,
46        #[arg(
47            long,
48            help = "Age before a task is considered stale and can be deleted without running the instructions. This is effectively the retention rate for debugging purposes."
49        )]
50        stale_task_age: u32,
51    },
52    Update {
53        #[command(flatten)]
54        task_queue: TaskQueueArg,
55        #[arg(long, help = "Default crank reward in lamports")]
56        min_crank_reward: Option<u64>,
57        #[arg(long, help = "Lookup tables to create")]
58        lookup_tables: Option<Vec<Pubkey>>,
59        #[arg(long)]
60        update_authority: Option<Pubkey>,
61        #[arg(long)]
62        capacity: Option<u16>,
63        #[arg(
64            long,
65            help = "Age before a task is considered stale and can be deleted without running the instructions. This is effectively the retention rate for debugging purposes."
66        )]
67        stale_task_age: Option<u32>,
68    },
69    Get {
70        #[command(flatten)]
71        task_queue: TaskQueueArg,
72    },
73    Fund {
74        #[command(flatten)]
75        task_queue: TaskQueueArg,
76        #[arg(long, help = "Amount to fund the task queue with, in lamports")]
77        amount: u64,
78    },
79    Close {
80        #[command(flatten)]
81        task_queue: TaskQueueArg,
82    },
83    AddQueueAuthority {
84        #[command(flatten)]
85        task_queue: TaskQueueArg,
86        #[arg(long, help = "Authority to add")]
87        queue_authority: Pubkey,
88    },
89    RemoveQueueAuthority {
90        #[command(flatten)]
91        task_queue: TaskQueueArg,
92        #[arg(long, help = "Authority to remove")]
93        queue_authority: Pubkey,
94    },
95}
96
97#[derive(Debug, Args)]
98pub struct TaskQueueArg {
99    #[arg(long = "task-queue-name", name = "task-queue-name")]
100    pub name: Option<String>,
101    #[arg(long = "task-queue-id", name = "task-queue-id")]
102    pub id: Option<u32>,
103    #[arg(long = "task-queue-pubkey", name = "task-queue-pubkey")]
104    pub pubkey: Option<String>,
105}
106
107impl TaskQueueArg {
108    pub async fn get_pubkey(&self, client: &CliClient) -> Result<Option<Pubkey>> {
109        let tuktuk_config_key = tuktuk::config_key();
110
111        if let Some(pubkey) = &self.pubkey {
112            // Use the provided pubkey directly
113            Ok(Some(Pubkey::from_str(pubkey)?))
114        } else if let Some(id) = self.id {
115            Ok(Some(tuktuk::task_queue::key(&tuktuk_config_key, id)))
116        } else if let Some(name) = &self.name {
117            let mapping: tuktuk_program::TaskQueueNameMappingV0 = client
118                .as_ref()
119                .anchor_account(&task_queue_name_mapping_key(&tuktuk_config_key, name))
120                .await?
121                .ok_or_else(|| anyhow::anyhow!("Task queue not found"))?;
122            Ok(Some(mapping.task_queue))
123        } else {
124            Ok(None)
125        }
126    }
127}
128
129impl TaskQueueCmd {
130    async fn fund_task_queue_ix(
131        client: &CliClient,
132        task_queue_key: &Pubkey,
133        amount: u64,
134    ) -> Result<Instruction> {
135        let ix = transfer(&client.payer.pubkey(), task_queue_key, amount);
136
137        Ok(ix)
138    }
139
140    pub async fn run(&self, opts: Opts) -> Result {
141        match &self.cmd {
142            Cmd::Create {
143                queue_authority,
144                update_authority,
145                capacity,
146                name,
147                min_crank_reward,
148                funding_amount,
149                lookup_tables,
150                stale_task_age,
151            } => {
152                let client = opts.client().await?;
153
154                let (key, ix) = tuktuk::task_queue::create(
155                    client.rpc_client.as_ref(),
156                    client.payer.pubkey(),
157                    tuktuk_program::types::InitializeTaskQueueArgsV0 {
158                        capacity: *capacity,
159                        min_crank_reward: *min_crank_reward,
160                        name: name.clone(),
161                        lookup_tables: lookup_tables.clone().unwrap_or_default(),
162                        stale_task_age: *stale_task_age,
163                    },
164                    *update_authority,
165                )
166                .await?;
167                let add_queue_authority_ix = tuktuk::task_queue::add_queue_authority_ix(
168                    client.payer.pubkey(),
169                    key,
170                    queue_authority.unwrap_or(client.payer.pubkey()),
171                    update_authority.unwrap_or(client.payer.pubkey()),
172                )?;
173                // Fund if amount specified
174                let fund_ix = Self::fund_task_queue_ix(&client, &key, *funding_amount).await?;
175
176                send_instructions(
177                    client.rpc_client.clone(),
178                    &client.payer,
179                    client.opts.ws_url().as_str(),
180                    &[fund_ix, ix, add_queue_authority_ix],
181                    &[],
182                )
183                .await?;
184
185                let task_queue: TaskQueueV0 = client
186                    .as_ref()
187                    .anchor_account(&key)
188                    .await?
189                    .ok_or_else(|| anyhow::anyhow!("Task queue not found: {}", key))?;
190                let task_queue_balance = client.rpc_client.get_balance(&key).await?;
191
192                print_json(&TaskQueue {
193                    pubkey: key,
194                    id: task_queue.id,
195                    name: name.clone(),
196                    capacity: task_queue.capacity,
197                    update_authority: task_queue.update_authority,
198                    min_crank_reward: task_queue.min_crank_reward,
199                    balance: task_queue_balance,
200                    stale_task_age: *stale_task_age,
201                })?;
202            }
203            Cmd::Update {
204                task_queue,
205                min_crank_reward,
206                lookup_tables,
207                update_authority,
208                capacity,
209                stale_task_age,
210            } => {
211                let client = opts.client().await?;
212                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
213                    anyhow::anyhow!(
214                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
215                    )
216                })?;
217                let ix = tuktuk::task_queue::update(
218                    client.rpc_client.as_ref(),
219                    client.payer.pubkey(),
220                    task_queue_key,
221                    tuktuk_program::types::UpdateTaskQueueArgsV0 {
222                        capacity: *capacity,
223                        min_crank_reward: *min_crank_reward,
224                        lookup_tables: lookup_tables.clone(),
225                        update_authority: *update_authority,
226                        stale_task_age: *stale_task_age,
227                    },
228                )
229                .await?;
230
231                send_instructions(
232                    client.rpc_client.clone(),
233                    &client.payer,
234                    client.opts.ws_url().as_str(),
235                    &[ix],
236                    &[],
237                )
238                .await?;
239            }
240            Cmd::Get { task_queue } => {
241                let client = opts.client().await?;
242                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
243                    anyhow::anyhow!(
244                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
245                    )
246                })?;
247                let task_queue: TaskQueueV0 = client
248                    .rpc_client
249                    .anchor_account(&task_queue_key)
250                    .await?
251                    .ok_or_else(|| anyhow::anyhow!("Task queue not found: {}", task_queue_key))?;
252
253                let task_queue_balance = client.rpc_client.get_balance(&task_queue_key).await?;
254                let serializable = TaskQueue {
255                    pubkey: task_queue_key,
256                    id: task_queue.id,
257                    capacity: task_queue.capacity,
258                    update_authority: task_queue.update_authority,
259                    name: task_queue.name,
260                    min_crank_reward: task_queue.min_crank_reward,
261                    balance: task_queue_balance,
262                    stale_task_age: task_queue.stale_task_age,
263                };
264                print_json(&serializable)?;
265            }
266            Cmd::Fund { task_queue, amount } => {
267                let client = opts.client().await?;
268                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
269                    anyhow::anyhow!(
270                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
271                    )
272                })?;
273
274                let fund_ix = Self::fund_task_queue_ix(&client, &task_queue_key, *amount).await?;
275                send_instructions(
276                    client.rpc_client.clone(),
277                    &client.payer,
278                    client.opts.ws_url().as_str(),
279                    &[fund_ix],
280                    &[],
281                )
282                .await?;
283            }
284            Cmd::AddQueueAuthority {
285                task_queue,
286                queue_authority,
287            } => {
288                let client = opts.client().await?;
289                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
290                    anyhow::anyhow!(
291                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
292                    )
293                })?;
294                let ix = tuktuk::task_queue::add_queue_authority(
295                    client.rpc_client.as_ref(),
296                    client.payer.pubkey(),
297                    task_queue_key,
298                    *queue_authority,
299                )
300                .await?;
301                send_instructions(
302                    client.rpc_client.clone(),
303                    &client.payer,
304                    client.opts.ws_url().as_str(),
305                    &[ix],
306                    &[],
307                )
308                .await?;
309            }
310            Cmd::RemoveQueueAuthority {
311                task_queue,
312                queue_authority,
313            } => {
314                let client = opts.client().await?;
315                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
316                    anyhow::anyhow!(
317                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
318                    )
319                })?;
320                let ix = tuktuk::task_queue::remove_queue_authority(
321                    client.rpc_client.as_ref(),
322                    client.payer.pubkey(),
323                    task_queue_key,
324                    *queue_authority,
325                )
326                .await?;
327                send_instructions(
328                    client.rpc_client.clone(),
329                    &client.payer,
330                    client.opts.ws_url().as_str(),
331                    &[ix],
332                    &[],
333                )
334                .await?;
335            }
336            Cmd::Close { task_queue } => {
337                let client = opts.client().await?;
338                let task_queue_key = task_queue.get_pubkey(&client).await?.ok_or_else(|| {
339                    anyhow::anyhow!(
340                        "Must provide task-queue-name, task-queue-id, or task-queue-pubkey"
341                    )
342                })?;
343
344                let ix = tuktuk::task_queue::close(
345                    client.rpc_client.as_ref(),
346                    task_queue_key,
347                    client.payer.pubkey(),
348                    client.payer.pubkey(),
349                )
350                .await?;
351                send_instructions(
352                    client.rpc_client.clone(),
353                    &client.payer,
354                    client.opts.ws_url().as_str(),
355                    &[ix],
356                    &[],
357                )
358                .await?;
359            }
360        }
361
362        Ok(())
363    }
364}
365
366#[derive(Serialize)]
367pub struct TaskQueue {
368    #[serde(with = "serde_pubkey")]
369    pub pubkey: Pubkey,
370    pub id: u32,
371    pub capacity: u16,
372    #[serde(with = "serde_pubkey")]
373    pub update_authority: Pubkey,
374    pub name: String,
375    pub min_crank_reward: u64,
376    pub balance: u64,
377    pub stale_task_age: u32,
378}