solana_transaction_utils/
queue.rs

1use std::{sync::Arc, time::Duration};
2
3use futures::stream::{FuturesUnordered, StreamExt};
4use itertools::Itertools;
5use solana_client::nonblocking::rpc_client::RpcClient;
6use solana_sdk::{
7    address_lookup_table::AddressLookupTableAccount,
8    instruction::Instruction,
9    message::{v0, VersionedMessage},
10    signature::Keypair,
11    signer::Signer,
12    transaction::{TransactionError, VersionedTransaction},
13};
14use tokio::{
15    sync::mpsc::{channel, Receiver, Sender},
16    time::{interval, Interval},
17};
18use tracing::info;
19
20use crate::{
21    error::Error,
22    pack::{PackedTransaction, MAX_TRANSACTION_SIZE},
23    priority_fee::{auto_compute_price, compute_budget_instruction},
24    sender::PackedTransactionWithTasks,
25};
26
27#[derive(Debug, Clone)]
28pub struct TransactionTask<T: Send + Clone> {
29    pub task: T,
30    // What is this task worth in lamports? Will not run tx if it is not worth it. To guarentee task runs, set to u64::MAX
31    pub worth: u64,
32    pub instructions: Vec<Instruction>,
33    pub lookup_tables: Option<Vec<AddressLookupTableAccount>>,
34}
35
36#[derive(Debug)]
37pub struct CompletedTransactionTask<T: Send + Clone> {
38    pub err: Option<Error>,
39    pub fee: u64,
40    pub task: TransactionTask<T>,
41}
42
43pub struct TransactionQueueArgs<T: Send + Clone> {
44    pub rpc_client: Arc<RpcClient>,
45    pub ws_url: String,
46    pub payer: Arc<Keypair>,
47    pub batch_duration: Duration,
48    pub receiver: Receiver<TransactionTask<T>>,
49    pub result_sender: Sender<CompletedTransactionTask<T>>,
50    pub packed_tx_sender: Sender<PackedTransactionWithTasks<T>>,
51    pub max_sol_fee: u64,
52    pub send_in_parallel: bool,
53}
54
55pub struct TransactionQueueHandles<T: Send + Clone> {
56    pub sender: Sender<TransactionTask<T>>,
57    pub receiver: Receiver<TransactionTask<T>>,
58    pub result_sender: Sender<CompletedTransactionTask<T>>,
59    pub result_receiver: Receiver<CompletedTransactionTask<T>>,
60}
61
62pub fn create_transaction_queue_handles<T: Send + Clone>(
63    channel_capacity: usize,
64) -> TransactionQueueHandles<T> {
65    let (tx, rx) = channel::<TransactionTask<T>>(channel_capacity);
66    let (result_tx, result_rx) = channel::<CompletedTransactionTask<T>>(channel_capacity);
67    TransactionQueueHandles {
68        sender: tx,
69        receiver: rx,
70        result_sender: result_tx,
71        result_receiver: result_rx,
72    }
73}
74
75const MAX_PACKABLE_TX_SIZE: usize = 800;
76
77pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
78    args: TransactionQueueArgs<T>,
79) -> Result<(), Error> {
80    let mut receiver = args.receiver;
81
82    // The currently staged bundle of tasks
83    let mut bundle = TaskBundle::new();
84    // The timer to wait for the batch duration if no new packable tasks show up
85    let mut wait_timer: Option<Interval> = None;
86    // The queue of tasks currently being simulated waiting to be sent to the sender
87    let mut simulation_queue = FuturesUnordered::new();
88
89    async fn simulate_transaction<T: Send + Clone>(
90        bundle: TaskBundle<T>,
91        rpc_client: Arc<RpcClient>,
92        payer: Arc<Keypair>,
93    ) -> (
94        Vec<TransactionTask<T>>,
95        Result<(Vec<Instruction>, Option<TransactionError>, u64), Error>,
96    ) {
97        let tasks = bundle.tasks;
98        let result = async {
99            let blockhash = rpc_client.get_latest_blockhash().await?;
100            let message = v0::Message::try_compile(
101                &payer.pubkey(),
102                &bundle.tx.instructions,
103                &bundle.lookup_tables,
104                blockhash,
105            )?;
106
107            let sim_result = rpc_client
108                .simulate_transaction(
109                    &VersionedTransaction::try_new(VersionedMessage::V0(message), &[&*payer])
110                        .map_err(Error::signer)?,
111                )
112                .await?;
113
114            if let Some(ref err) = sim_result.value.err {
115                info!(?err, ?sim_result.value.logs, "simulation error");
116            }
117
118            // Scale up by 1.2 just to be sure it'll succeed.
119            let compute_units =
120                (sim_result.value.units_consumed.unwrap_or(1000000) as f64 * 1.2) as u32;
121            let mut updated_instructions = bundle.tx.instructions.clone();
122            let compute_budget_ix = compute_budget_instruction(compute_units);
123            // Replace or insert compute budget instruction
124            if let Some(pos) = updated_instructions.iter().position(|ix| {
125                ix.program_id == solana_sdk::compute_budget::id()
126                    && ix.data.first() == compute_budget_ix.data.first()
127            }) {
128                updated_instructions[pos] = compute_budget_ix; // Replace existing
129            } else {
130                updated_instructions.insert(0, compute_budget_ix); // Insert at the beginning
131            }
132
133            let fee = if sim_result.value.err.is_some() {
134                0
135            } else {
136                let (ixs, fee) = auto_compute_price(
137                    &rpc_client,
138                    &updated_instructions,
139                    &payer.pubkey(),
140                    compute_units,
141                )
142                .await?;
143                updated_instructions = ixs;
144                fee
145            };
146
147            Ok((updated_instructions, sim_result.value.err, fee))
148        }
149        .await;
150
151        (tasks, result)
152    }
153
154    // Main loop with shutdown handling
155    loop {
156        tokio::select! {
157            // If we have a bundle waiting to be packed further and the timer runs out, send it to the sender
158            _ = async { if let Some(timer) = &mut wait_timer { timer.tick().await } else { std::future::pending().await } } => {
159                if !bundle.is_empty() {
160                    simulation_queue.push(simulate_transaction(
161                        bundle,
162                        args.rpc_client.clone(),
163                        args.payer.clone(),
164                    ));
165                    bundle = TaskBundle::new();
166                    wait_timer = None;
167                }
168            }
169
170            // If we have a new task, try to add it to the bundle
171            Some(task) = receiver.recv() => {
172                match bundle.add_task(task.clone()) {
173                    // Task is small, we can pack it further. Set the timer to wait for the batch duration
174                    Ok((len, added)) if added && len <= MAX_PACKABLE_TX_SIZE => {
175                        if wait_timer.is_none() {
176                            wait_timer = Some(interval(args.batch_duration));
177                        }
178                    }
179                    Ok((_, added)) if added => {
180                        // Bundle full, simulate it
181                        simulation_queue.push(simulate_transaction(
182                            bundle,
183                            args.rpc_client.clone(),
184                            args.payer.clone(),
185                        ));
186                        bundle = TaskBundle::new();
187                    }
188                    Ok((_, added)) if !added => {
189                        // Current task won't fit, simulate current bundle first
190                        if !bundle.is_empty() {
191                            simulation_queue.push(simulate_transaction(
192                                bundle,
193                                args.rpc_client.clone(),
194                                args.payer.clone(),
195                            ));
196                            bundle = TaskBundle::new();
197                        }
198                        // Try adding task to empty bundle
199                        if let Err(e) = bundle.add_task(task.clone()) {
200                            args.result_sender
201                                .send(CompletedTransactionTask {
202                                    err: Some(e),
203                                    task,
204                                    fee: 0,
205                                })
206                                .await?;
207                        }
208                    }
209                    Err(e) => {
210                        args.result_sender.send(CompletedTransactionTask {
211                            err: Some(e),
212                            task,
213                            fee: 0,
214                        }).await?;
215                    },
216                    _ => {
217                        // We should never get here
218                        panic!("Invalid return value from bundle.add_task");
219                    }
220                }
221            }
222
223            Some((tasks, result)) = simulation_queue.next() => {
224                match result {
225                    Ok((instructions, error, fee)) => {
226                        // Notify tasks they failed
227                        if let Some(e) = error {
228                            match e {
229                                TransactionError::InstructionError(failed_ix, _) => {
230                                    let failed_task_idx = {
231                                        let mut current_task: usize = 0;
232                                        let mut current_ix: usize = 2; // Skip compute budget instructions
233
234                                        // Find which task contains the failed instruction
235                                        while current_ix < failed_ix as usize {
236                                            if current_task >= tasks.len() {
237                                                break;
238                                            }
239                                            current_ix += tasks[current_task].instructions.len();
240                                            if current_ix < failed_ix as usize {
241                                                current_task += 1;
242                                            }
243                                        }
244                                        current_task
245                                    };
246
247                                    if failed_task_idx >= tasks.len() {
248                                        println!("Failed task index out of bounds {:?} failed_ix: {:?}, tasks lens: {:?}", failed_task_idx, failed_ix, tasks.iter().map(|t| t.instructions.len()).collect_vec());
249                                        for task in tasks {
250                                            args.result_sender.send(CompletedTransactionTask {
251                                                err: Some(Error::SimulatedTransactionError(e.clone())),
252                                                task,
253                                                fee: 0,
254                                            }).await?;
255                                        }
256                                    } else {
257                                        // Handle failed task
258                                        args.result_sender.send(CompletedTransactionTask {
259                                            err: Some(Error::SimulatedTransactionError(e)),
260                                            task: tasks[failed_task_idx].clone(),
261                                            fee: 0,
262                                        }).await?;
263
264                                        // Requeue remaining tasks
265                                        let mut new_bundle = TaskBundle::new();
266                                        for (i, task) in tasks.iter().enumerate() {
267                                            if i != failed_task_idx {
268                                                new_bundle.add_task(task.clone()).expect("add task");
269                                            }
270                                        }
271                                        if !new_bundle.is_empty() {
272                                            simulation_queue.push(simulate_transaction(
273                                                new_bundle,
274                                                args.rpc_client.clone(),
275                                                args.payer.clone(),
276                                            ));
277                                        }
278                                    }
279                                }
280                                _ => {
281                                    // Other errors affect all tasks
282                                    for task in tasks {
283                                        args.result_sender.send(CompletedTransactionTask {
284                                            err: Some(Error::SimulatedTransactionError(e.clone())),
285                                            task,
286                                            fee: 0,
287                                        }).await?;
288                                    }
289                                }
290                            }
291                        } else if fee > args.max_sol_fee || fee > tasks.iter().map(|t| t.worth).sum::<u64>() {
292                            // Fee too high, notify tasks
293                            for task in tasks {
294                                args.result_sender.send(CompletedTransactionTask {
295                                    err: Some(Error::FeeTooHigh),
296                                    task,
297                                    fee: 0,
298                                }).await?;
299                            }
300                        } else {
301                            // Simulation successful, send to transaction sender
302                            args.packed_tx_sender.send(PackedTransactionWithTasks {
303                                instructions,
304                                tasks,
305                                fee,
306                                re_sign_count: 0,
307                            }).await?;
308                        }
309                    }
310                    Err(e) => {
311                        // Simulation failed, notify tasks
312                        for task in tasks.iter() {
313                            args.result_sender.send(CompletedTransactionTask {
314                                err: Some(Error::RawSimulatedTransactionError(e.to_string())),
315                                task: task.clone(),
316                                fee: 0,
317                            }).await?;
318                        }
319                    }
320                }
321            }
322        }
323    }
324}
325
326struct TaskBundle<T: Send + Clone> {
327    tx: PackedTransaction,
328    tasks: Vec<TransactionTask<T>>,
329    lookup_tables: Vec<AddressLookupTableAccount>,
330}
331
332impl<T: Send + Clone> TaskBundle<T> {
333    fn new() -> Self {
334        Self {
335            tx: PackedTransaction::default(),
336            tasks: Vec::new(),
337            lookup_tables: Vec::new(),
338        }
339    }
340
341    fn is_empty(&self) -> bool {
342        self.tx.is_empty()
343    }
344
345    // Returns the length of the transaction and a boolean indicating if the task was added
346    fn add_task(&mut self, task: TransactionTask<T>) -> Result<(usize, bool), Error> {
347        let task_instructions = task.instructions.as_slice();
348        let mut test_luts = self.lookup_tables.clone();
349        if let Some(luts) = task.lookup_tables.clone() {
350            test_luts.extend(luts);
351        }
352
353        // Test if we can fit this task
354        let len = self.tx.transaction_len(task_instructions, &test_luts)?;
355
356        let mut added = false;
357        // Only add the task if it fits
358        if len <= MAX_TRANSACTION_SIZE {
359            added = true;
360            self.lookup_tables = test_luts;
361            self.tx.push(task_instructions, 0);
362            self.tasks.push(task);
363        }
364
365        Ok((len, added))
366    }
367}