solana_transaction_utils/
queue.rs1use std::{sync::Arc, time::Duration};
2
3use futures::stream::{FuturesUnordered, StreamExt};
4use itertools::Itertools;
5use solana_client::nonblocking::rpc_client::RpcClient;
6use solana_sdk::{
7 address_lookup_table::AddressLookupTableAccount,
8 instruction::Instruction,
9 message::{v0, VersionedMessage},
10 signature::Keypair,
11 signer::Signer,
12 transaction::{TransactionError, VersionedTransaction},
13};
14use tokio::{
15 sync::mpsc::{channel, Receiver, Sender},
16 time::{interval, Interval},
17};
18use tracing::info;
19
20use crate::{
21 error::Error,
22 pack::{PackedTransaction, MAX_TRANSACTION_SIZE},
23 priority_fee::{auto_compute_price, compute_budget_instruction},
24 sender::PackedTransactionWithTasks,
25};
26
27#[derive(Debug, Clone)]
28pub struct TransactionTask<T: Send + Clone> {
29 pub task: T,
30 pub worth: u64,
32 pub instructions: Vec<Instruction>,
33 pub lookup_tables: Option<Vec<AddressLookupTableAccount>>,
34}
35
36#[derive(Debug)]
37pub struct CompletedTransactionTask<T: Send + Clone> {
38 pub err: Option<Error>,
39 pub fee: u64,
40 pub task: TransactionTask<T>,
41}
42
43pub struct TransactionQueueArgs<T: Send + Clone> {
44 pub rpc_client: Arc<RpcClient>,
45 pub ws_url: String,
46 pub payer: Arc<Keypair>,
47 pub batch_duration: Duration,
48 pub receiver: Receiver<TransactionTask<T>>,
49 pub result_sender: Sender<CompletedTransactionTask<T>>,
50 pub packed_tx_sender: Sender<PackedTransactionWithTasks<T>>,
51 pub max_sol_fee: u64,
52 pub send_in_parallel: bool,
53}
54
55pub struct TransactionQueueHandles<T: Send + Clone> {
56 pub sender: Sender<TransactionTask<T>>,
57 pub receiver: Receiver<TransactionTask<T>>,
58 pub result_sender: Sender<CompletedTransactionTask<T>>,
59 pub result_receiver: Receiver<CompletedTransactionTask<T>>,
60}
61
62pub fn create_transaction_queue_handles<T: Send + Clone>(
63 channel_capacity: usize,
64) -> TransactionQueueHandles<T> {
65 let (tx, rx) = channel::<TransactionTask<T>>(channel_capacity);
66 let (result_tx, result_rx) = channel::<CompletedTransactionTask<T>>(channel_capacity);
67 TransactionQueueHandles {
68 sender: tx,
69 receiver: rx,
70 result_sender: result_tx,
71 result_receiver: result_rx,
72 }
73}
74
75const MAX_PACKABLE_TX_SIZE: usize = 800;
76
77pub async fn create_transaction_queue<T: Send + Clone + 'static + Sync>(
78 args: TransactionQueueArgs<T>,
79) -> Result<(), Error> {
80 let mut receiver = args.receiver;
81
82 let mut bundle = TaskBundle::new();
84 let mut wait_timer: Option<Interval> = None;
86 let mut simulation_queue = FuturesUnordered::new();
88
89 async fn simulate_transaction<T: Send + Clone>(
90 bundle: TaskBundle<T>,
91 rpc_client: Arc<RpcClient>,
92 payer: Arc<Keypair>,
93 ) -> (
94 Vec<TransactionTask<T>>,
95 Result<(Vec<Instruction>, Option<TransactionError>, u64), Error>,
96 ) {
97 let tasks = bundle.tasks;
98 let result = async {
99 let blockhash = rpc_client.get_latest_blockhash().await?;
100 let message = v0::Message::try_compile(
101 &payer.pubkey(),
102 &bundle.tx.instructions,
103 &bundle.lookup_tables,
104 blockhash,
105 )?;
106
107 let sim_result = rpc_client
108 .simulate_transaction(
109 &VersionedTransaction::try_new(VersionedMessage::V0(message), &[&*payer])
110 .map_err(Error::signer)?,
111 )
112 .await?;
113
114 if let Some(ref err) = sim_result.value.err {
115 info!(?err, ?sim_result.value.logs, "simulation error");
116 }
117
118 let compute_units =
120 (sim_result.value.units_consumed.unwrap_or(1000000) as f64 * 1.2) as u32;
121 let mut updated_instructions = bundle.tx.instructions.clone();
122 let compute_budget_ix = compute_budget_instruction(compute_units);
123 if let Some(pos) = updated_instructions.iter().position(|ix| {
125 ix.program_id == solana_sdk::compute_budget::id()
126 && ix.data.first() == compute_budget_ix.data.first()
127 }) {
128 updated_instructions[pos] = compute_budget_ix; } else {
130 updated_instructions.insert(0, compute_budget_ix); }
132
133 let fee = if sim_result.value.err.is_some() {
134 0
135 } else {
136 let (ixs, fee) = auto_compute_price(
137 &rpc_client,
138 &updated_instructions,
139 &payer.pubkey(),
140 compute_units,
141 )
142 .await?;
143 updated_instructions = ixs;
144 fee
145 };
146
147 Ok((updated_instructions, sim_result.value.err, fee))
148 }
149 .await;
150
151 (tasks, result)
152 }
153
154 loop {
156 tokio::select! {
157 _ = async { if let Some(timer) = &mut wait_timer { timer.tick().await } else { std::future::pending().await } } => {
159 if !bundle.is_empty() {
160 simulation_queue.push(simulate_transaction(
161 bundle,
162 args.rpc_client.clone(),
163 args.payer.clone(),
164 ));
165 bundle = TaskBundle::new();
166 wait_timer = None;
167 }
168 }
169
170 Some(task) = receiver.recv() => {
172 match bundle.add_task(task.clone()) {
173 Ok((len, added)) if added && len <= MAX_PACKABLE_TX_SIZE => {
175 if wait_timer.is_none() {
176 wait_timer = Some(interval(args.batch_duration));
177 }
178 }
179 Ok((_, added)) if added => {
180 simulation_queue.push(simulate_transaction(
182 bundle,
183 args.rpc_client.clone(),
184 args.payer.clone(),
185 ));
186 bundle = TaskBundle::new();
187 }
188 Ok((_, added)) if !added => {
189 if !bundle.is_empty() {
191 simulation_queue.push(simulate_transaction(
192 bundle,
193 args.rpc_client.clone(),
194 args.payer.clone(),
195 ));
196 bundle = TaskBundle::new();
197 }
198 if let Err(e) = bundle.add_task(task.clone()) {
200 args.result_sender
201 .send(CompletedTransactionTask {
202 err: Some(e),
203 task,
204 fee: 0,
205 })
206 .await?;
207 }
208 }
209 Err(e) => {
210 args.result_sender.send(CompletedTransactionTask {
211 err: Some(e),
212 task,
213 fee: 0,
214 }).await?;
215 },
216 _ => {
217 panic!("Invalid return value from bundle.add_task");
219 }
220 }
221 }
222
223 Some((tasks, result)) = simulation_queue.next() => {
224 match result {
225 Ok((instructions, error, fee)) => {
226 if let Some(e) = error {
228 match e {
229 TransactionError::InstructionError(failed_ix, _) => {
230 let failed_task_idx = {
231 let mut current_task: usize = 0;
232 let mut current_ix: usize = 2; while current_ix < failed_ix as usize {
236 if current_task >= tasks.len() {
237 break;
238 }
239 current_ix += tasks[current_task].instructions.len();
240 if current_ix < failed_ix as usize {
241 current_task += 1;
242 }
243 }
244 current_task
245 };
246
247 if failed_task_idx >= tasks.len() {
248 println!("Failed task index out of bounds {:?} failed_ix: {:?}, tasks lens: {:?}", failed_task_idx, failed_ix, tasks.iter().map(|t| t.instructions.len()).collect_vec());
249 for task in tasks {
250 args.result_sender.send(CompletedTransactionTask {
251 err: Some(Error::SimulatedTransactionError(e.clone())),
252 task,
253 fee: 0,
254 }).await?;
255 }
256 } else {
257 args.result_sender.send(CompletedTransactionTask {
259 err: Some(Error::SimulatedTransactionError(e)),
260 task: tasks[failed_task_idx].clone(),
261 fee: 0,
262 }).await?;
263
264 let mut new_bundle = TaskBundle::new();
266 for (i, task) in tasks.iter().enumerate() {
267 if i != failed_task_idx {
268 new_bundle.add_task(task.clone()).expect("add task");
269 }
270 }
271 if !new_bundle.is_empty() {
272 simulation_queue.push(simulate_transaction(
273 new_bundle,
274 args.rpc_client.clone(),
275 args.payer.clone(),
276 ));
277 }
278 }
279 }
280 _ => {
281 for task in tasks {
283 args.result_sender.send(CompletedTransactionTask {
284 err: Some(Error::SimulatedTransactionError(e.clone())),
285 task,
286 fee: 0,
287 }).await?;
288 }
289 }
290 }
291 } else if fee > args.max_sol_fee || fee > tasks.iter().map(|t| t.worth).sum::<u64>() {
292 for task in tasks {
294 args.result_sender.send(CompletedTransactionTask {
295 err: Some(Error::FeeTooHigh),
296 task,
297 fee: 0,
298 }).await?;
299 }
300 } else {
301 args.packed_tx_sender.send(PackedTransactionWithTasks {
303 instructions,
304 tasks,
305 fee,
306 re_sign_count: 0,
307 }).await?;
308 }
309 }
310 Err(e) => {
311 for task in tasks.iter() {
313 args.result_sender.send(CompletedTransactionTask {
314 err: Some(Error::RawSimulatedTransactionError(e.to_string())),
315 task: task.clone(),
316 fee: 0,
317 }).await?;
318 }
319 }
320 }
321 }
322 }
323 }
324}
325
326struct TaskBundle<T: Send + Clone> {
327 tx: PackedTransaction,
328 tasks: Vec<TransactionTask<T>>,
329 lookup_tables: Vec<AddressLookupTableAccount>,
330}
331
332impl<T: Send + Clone> TaskBundle<T> {
333 fn new() -> Self {
334 Self {
335 tx: PackedTransaction::default(),
336 tasks: Vec::new(),
337 lookup_tables: Vec::new(),
338 }
339 }
340
341 fn is_empty(&self) -> bool {
342 self.tx.is_empty()
343 }
344
345 fn add_task(&mut self, task: TransactionTask<T>) -> Result<(usize, bool), Error> {
347 let task_instructions = task.instructions.as_slice();
348 let mut test_luts = self.lookup_tables.clone();
349 if let Some(luts) = task.lookup_tables.clone() {
350 test_luts.extend(luts);
351 }
352
353 let len = self.tx.transaction_len(task_instructions, &test_luts)?;
355
356 let mut added = false;
357 if len <= MAX_TRANSACTION_SIZE {
359 added = true;
360 self.lookup_tables = test_luts;
361 self.tx.push(task_instructions, 0);
362 self.tasks.push(task);
363 }
364
365 Ok((len, added))
366 }
367}