1use std::{
2 collections::VecDeque,
3 str::FromStr,
4 sync::{Arc, OnceLock},
5 task::{Context, Poll},
6 time::Duration,
7};
8
9use ahash::HashSet;
10use anchor_lang::{AnchorDeserialize, Discriminator};
11use base64::Engine;
12pub use drift_pubsub_client::PubsubClient;
13use futures_util::{future::BoxFuture, stream::FuturesOrdered, FutureExt, Stream, StreamExt};
14use log::{debug, info, warn};
15use regex::Regex;
16pub use solana_rpc_client::nonblocking::rpc_client::RpcClient;
17use solana_rpc_client::rpc_client::GetConfirmedSignaturesForAddress2Config;
18use solana_rpc_client_api::{
19 config::{RpcTransactionConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter},
20 response::RpcLogsResponse,
21};
22pub use solana_sdk::commitment_config::CommitmentConfig;
23use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction::VersionedTransaction};
24use solana_transaction_status::{
25 option_serializer::OptionSerializer, EncodedTransactionWithStatusMeta, UiTransactionEncoding,
26};
27use tokio::{
28 sync::{
29 mpsc::{channel, Receiver, Sender},
30 RwLock,
31 },
32 task::JoinHandle,
33};
34
35use crate::{
36 constants,
37 drift_idl::{
38 events::{FundingPaymentRecord, OrderActionRecord, OrderRecord},
39 types::{MarketType, Order, OrderAction, OrderActionExplanation, PositionDirection},
40 },
41 types::SdkResult,
42};
43
44const LOG_TARGET: &str = "events";
45const EMPTY_SIGNATURE: &str = "1111111111111111111111111111111111111111111111111111111111111111";
46
47impl EventRpcProvider for RpcClient {
48 fn get_tx(
49 &self,
50 signature: Signature,
51 ) -> BoxFuture<SdkResult<EncodedTransactionWithStatusMeta>> {
52 async move {
53 let result = self
54 .get_transaction_with_config(
55 &signature,
56 RpcTransactionConfig {
57 encoding: Some(UiTransactionEncoding::Base64),
58 max_supported_transaction_version: Some(0),
59 ..Default::default()
60 },
61 )
62 .await?;
63
64 Ok(result.transaction)
65 }
66 .boxed()
67 }
68 fn get_tx_signatures(
69 &self,
70 account: Pubkey,
71 after: Option<Signature>,
72 limit: Option<usize>,
73 ) -> BoxFuture<SdkResult<Vec<String>>> {
74 async move {
75 let results = self
76 .get_signatures_for_address_with_config(
77 &account,
78 GetConfirmedSignaturesForAddress2Config {
79 until: after,
80 limit,
81 ..Default::default()
82 },
83 )
84 .await?;
85
86 Ok(results.iter().map(|r| r.signature.clone()).collect())
87 }
88 .boxed()
89 }
90}
91
92pub trait EventRpcProvider: Send + Sync + 'static {
94 fn get_tx_signatures(
98 &self,
99 account: Pubkey,
100 after: Option<Signature>,
101 limit: Option<usize>,
102 ) -> BoxFuture<SdkResult<Vec<String>>>;
103 fn get_tx(
105 &self,
106 signature: Signature,
107 ) -> BoxFuture<SdkResult<EncodedTransactionWithStatusMeta>>;
108}
109
110pub struct EventSubscriber;
112
113impl EventSubscriber {
114 pub async fn subscribe(
120 ws: Arc<PubsubClient>,
121 sub_account: Pubkey,
122 ) -> SdkResult<DriftEventStream> {
123 log_stream(ws, sub_account).await
124 }
125 pub fn subscribe_polled(provider: impl EventRpcProvider, account: Pubkey) -> DriftEventStream {
127 polled_stream(provider, account)
128 }
129}
130
131struct LogEventStream {
132 cache: Arc<RwLock<TxSignatureCache>>,
133 provider: Arc<PubsubClient>,
134 sub_account: Pubkey,
135 event_tx: Sender<DriftEvent>,
136 commitment: CommitmentConfig,
137}
138
139impl LogEventStream {
140 async fn stream_fn(self) {
142 let sub_account = self.sub_account;
143 info!(target: LOG_TARGET, "log stream connecting: {sub_account:?}");
144
145 let subscribe_result = self
146 .provider
147 .logs_subscribe(
148 RpcTransactionLogsFilter::Mentions(vec![self.sub_account.to_string()]),
149 RpcTransactionLogsConfig {
150 commitment: Some(self.commitment),
151 },
152 )
153 .await;
154
155 if let Err(ref err) = subscribe_result {
156 warn!(target: LOG_TARGET, "log subscription failed for: {sub_account:?}. {err:?}");
157 return;
158 }
159
160 let (mut log_stream, _unsub_fn) = subscribe_result.unwrap();
161 debug!(target: LOG_TARGET, "start log subscription: {sub_account:?}");
162
163 while let Some(response) = log_stream.next().await {
164 self.process_log(response.value).await;
165 }
166 warn!(target: LOG_TARGET, "log stream ended: {sub_account:?}");
167 }
168
169 async fn process_log(&self, response: RpcLogsResponse) {
171 let signature = response.signature;
172 if response.err.is_some() {
173 debug!(target: LOG_TARGET, "skipping failed tx: {signature:?}");
174 return;
175 }
176 if signature == EMPTY_SIGNATURE {
177 debug!(target: LOG_TARGET, "skipping empty signature");
178 return;
179 }
180 {
181 let mut cache = self.cache.write().await;
182 if cache.contains(&signature) {
183 debug!(target: LOG_TARGET, "skipping cached tx: {signature:?}");
184 return;
185 }
186 cache.insert(signature.clone());
187 }
188
189 debug!(target: LOG_TARGET, "log extracting events, tx: {signature:?}");
190 for (tx_idx, log) in response.logs.iter().enumerate() {
191 if let Some(event) = try_parse_log(log.as_str(), &signature, tx_idx) {
193 if event.pertains_to(self.sub_account) {
195 if self.event_tx.send(event).await.is_err() {
196 warn!("event receiver closed");
197 return;
198 }
199 }
200 }
201 }
202 }
203}
204
205fn polled_stream(provider: impl EventRpcProvider, sub_account: Pubkey) -> DriftEventStream {
207 let (event_tx, event_rx) = channel(256);
208 let cache = Arc::new(RwLock::new(TxSignatureCache::new(128)));
209 let join_handle = tokio::spawn(
210 PolledEventStream {
211 cache: Arc::clone(&cache),
212 provider,
213 sub_account,
214 event_tx,
215 }
216 .stream_fn(),
217 );
218
219 DriftEventStream {
220 rx: event_rx,
221 task: join_handle,
222 }
223}
224
225async fn log_stream(ws: Arc<PubsubClient>, sub_account: Pubkey) -> SdkResult<DriftEventStream> {
227 debug!(target: LOG_TARGET, "stream events for {sub_account:?}");
228 let (event_tx, event_rx) = channel(256);
229 let cache = Arc::new(RwLock::new(TxSignatureCache::new(256)));
230
231 let join_handle = tokio::spawn(async move {
233 LogEventStream {
234 provider: ws,
235 cache: Arc::clone(&cache),
236 sub_account,
237 event_tx: event_tx.clone(),
238 commitment: CommitmentConfig::confirmed(),
239 }
240 .stream_fn()
241 .await;
242 });
243
244 Ok(DriftEventStream {
245 rx: event_rx,
246 task: join_handle,
247 })
248}
249
250pub struct PolledEventStream<T: EventRpcProvider> {
251 cache: Arc<RwLock<TxSignatureCache>>,
252 event_tx: Sender<DriftEvent>,
253 provider: T,
254 sub_account: Pubkey,
255}
256
257impl<T: EventRpcProvider> PolledEventStream<T> {
258 async fn stream_fn(self) {
259 debug!(target: LOG_TARGET, "poll events for {:?}", self.sub_account);
260 debug!(target: LOG_TARGET, "fetch initial txs");
263 let res = self
264 .provider
265 .get_tx_signatures(self.sub_account, None, Some(1))
266 .await;
267 debug!(target: LOG_TARGET, "fetched initial txs");
268
269 let mut last_seen_tx = res.expect("fetched tx").first().cloned();
270 let provider_ref = &self.provider;
271 'outer: loop {
272 tokio::time::sleep(Duration::from_millis(400)).await;
274
275 debug!(target: LOG_TARGET, "poll txs for events");
276 let signatures = provider_ref
277 .get_tx_signatures(
278 self.sub_account,
279 last_seen_tx
280 .clone()
281 .map(|s| Signature::from_str(&s).unwrap()),
282 None,
283 )
284 .await;
285
286 if let Err(err) = signatures {
287 warn!(target: LOG_TARGET, "poll tx signatures: {err:?}");
288 continue;
289 }
290
291 let signatures = signatures.unwrap();
292 let mut futs = {
295 FuturesOrdered::from_iter(
296 signatures
297 .into_iter()
298 .map(|s| async move {
299 (
300 s.clone(),
301 provider_ref
302 .get_tx(
303 Signature::from_str(s.as_str()).expect("valid signature"),
304 )
305 .await,
306 )
307 })
308 .rev(),
309 )
310 };
311 if futs.is_empty() {
312 continue;
313 }
314
315 while let Some((signature, response)) = futs.next().await {
316 debug!(target: LOG_TARGET, "poll extracting events, tx: {signature:?}");
317 if let Err(err) = response {
318 warn!(target: LOG_TARGET, "poll processing tx: {err:?}");
319 continue 'outer;
321 }
322
323 last_seen_tx = Some(signature.clone());
324 {
325 let mut cache = self.cache.write().await;
326 if cache.contains(&signature) {
327 debug!(target: LOG_TARGET, "poll skipping cached tx: {signature:?}");
328 continue;
329 }
330 cache.insert(signature.clone());
331 }
332
333 let EncodedTransactionWithStatusMeta {
334 meta, transaction, ..
335 } = response.unwrap();
336 if meta.is_none() {
337 continue;
338 }
339 let meta = meta.unwrap();
340
341 if let Some(VersionedTransaction { message, .. }) = transaction.decode() {
342 if !message
344 .static_account_keys()
345 .iter()
346 .any(|k| k == &constants::PROGRAM_ID)
347 {
348 continue;
349 }
350 }
351 if meta.err.is_some() {
353 continue;
354 }
355
356 if let OptionSerializer::Some(logs) = meta.log_messages {
357 for (tx_idx, log) in logs.iter().enumerate() {
358 if let Some(event) = try_parse_log(log.as_str(), signature.as_str(), tx_idx)
359 {
360 if event.pertains_to(self.sub_account) {
361 self.event_tx.try_send(event).expect("sent");
362 }
363 }
364 }
365 }
366 }
367 }
368 }
369}
370
371pub struct DriftEventStream {
373 task: JoinHandle<()>,
375 rx: Receiver<DriftEvent>,
377}
378
379impl DriftEventStream {
380 pub fn unsubscribe(&self) {
382 self.task.abort();
383 }
384}
385
386impl Drop for DriftEventStream {
387 fn drop(&mut self) {
388 self.unsubscribe()
389 }
390}
391
392impl Stream for DriftEventStream {
393 type Item = DriftEvent;
394 fn poll_next(
395 mut self: std::pin::Pin<&mut Self>,
396 cx: &mut Context<'_>,
397 ) -> Poll<Option<Self::Item>> {
398 self.as_mut().rx.poll_recv(cx)
399 }
400}
401
402const PROGRAM_LOG: &str = "Program log: ";
403const PROGRAM_DATA: &str = "Program data: ";
404
405pub fn try_parse_log(raw: &str, signature: &str, tx_idx: usize) -> Option<DriftEvent> {
408 if let Some(log) = raw
410 .strip_prefix(PROGRAM_LOG)
411 .or_else(|| raw.strip_prefix(PROGRAM_DATA))
412 {
413 if let Ok(borsh_bytes) = base64::engine::general_purpose::STANDARD.decode(log) {
414 let (disc, mut data) = borsh_bytes.split_at(8);
415 let disc: [u8; 8] = disc.try_into().unwrap();
416
417 return DriftEvent::from_discriminant(disc, &mut data, signature, tx_idx);
418 }
419
420 let order_cancel_missing_re = ORDER_CANCEL_MISSING_RE
422 .get_or_init(|| Regex::new(r"could not find( user){0,1} order id (\d+)").unwrap());
423 if let Some(captures) = order_cancel_missing_re.captures(log) {
424 let order_id = captures
425 .get(2)
426 .unwrap()
427 .as_str()
428 .parse::<u32>()
429 .expect("<u32");
430 let event = if captures.get(1).is_some() {
431 DriftEvent::OrderCancelMissing {
433 user_order_id: order_id as u8,
434 order_id: 0,
435 signature: signature.to_string(),
436 }
437 } else {
438 DriftEvent::OrderCancelMissing {
440 user_order_id: 0,
441 order_id,
442 signature: signature.to_string(),
443 }
444 };
445
446 return Some(event);
447 }
448 }
449
450 None
451}
452
453static ORDER_CANCEL_MISSING_RE: OnceLock<Regex> = OnceLock::new();
454
455#[derive(Debug, PartialEq)]
457pub enum DriftEvent {
458 OrderFill {
459 maker: Option<Pubkey>,
460 maker_fee: i64,
461 maker_order_id: u32,
462 maker_side: Option<PositionDirection>,
463 taker: Option<Pubkey>,
464 taker_fee: u64,
465 taker_order_id: u32,
466 taker_side: Option<PositionDirection>,
467 base_asset_amount_filled: u64,
468 quote_asset_amount_filled: u64,
469 market_index: u16,
470 market_type: MarketType,
471 oracle_price: i64,
472 signature: String,
473 tx_idx: usize,
474 ts: u64,
475 },
476 OrderCancel {
477 taker: Option<Pubkey>,
478 maker: Option<Pubkey>,
479 taker_order_id: u32,
480 maker_order_id: u32,
481 signature: String,
482 tx_idx: usize,
483 ts: u64,
484 },
485 OrderCancelMissing {
487 user_order_id: u8,
488 order_id: u32,
489 signature: String,
490 },
491 OrderCreate {
492 order: Order,
493 user: Pubkey,
494 ts: u64,
495 signature: String,
496 tx_idx: usize,
497 },
498 OrderExpire {
500 order_id: u32,
501 user: Option<Pubkey>,
502 fee: u64,
503 ts: u64,
504 signature: String,
505 tx_idx: usize,
506 },
507 FundingPayment {
508 amount: i64,
509 market_index: u16,
510 user: Pubkey,
511 ts: u64,
512 signature: String,
513 tx_idx: usize,
514 },
515}
516
517impl DriftEvent {
518 fn pertains_to(&self, sub_account: Pubkey) -> bool {
520 let subject = &Some(sub_account);
521 match self {
522 Self::OrderCancel { maker, taker, .. } | Self::OrderFill { maker, taker, .. } => {
523 maker == subject || taker == subject
524 }
525 Self::OrderCreate { user, .. } => *user == sub_account,
526 Self::OrderExpire { user, .. } => user == subject,
527 Self::OrderCancelMissing { .. } => true,
528 Self::FundingPayment { user, .. } => *user == sub_account,
529 }
530 }
531 fn from_discriminant(
533 disc: [u8; 8],
534 data: &mut &[u8],
535 signature: &str,
536 tx_idx: usize,
537 ) -> Option<Self> {
538 match disc.as_slice() {
539 OrderActionRecord::DISCRIMINATOR => Self::from_oar(
541 OrderActionRecord::deserialize(data).expect("deserializes"),
542 signature,
543 tx_idx,
544 ),
545 OrderRecord::DISCRIMINATOR => Self::from_order_record(
546 OrderRecord::deserialize(data).expect("deserializes"),
547 signature,
548 tx_idx,
549 ),
550 FundingPaymentRecord::DISCRIMINATOR => Some(Self::from_funding_payment_record(
551 FundingPaymentRecord::deserialize(data).expect("deserializes"),
552 signature,
553 tx_idx,
554 )),
555 _ => {
556 debug!(target: LOG_TARGET, "unhandled event: {disc:?}");
557 None
558 }
559 }
560 }
561 fn from_funding_payment_record(
562 value: FundingPaymentRecord,
563 signature: &str,
564 tx_idx: usize,
565 ) -> Self {
566 Self::FundingPayment {
567 amount: value.funding_payment,
568 market_index: value.market_index,
569 ts: value.ts.unsigned_abs(),
570 user: value.user,
571 signature: signature.to_string(),
572 tx_idx,
573 }
574 }
575 fn from_order_record(value: OrderRecord, signature: &str, tx_idx: usize) -> Option<Self> {
576 Some(DriftEvent::OrderCreate {
577 order: value.order,
578 user: value.user,
579 ts: value.ts.unsigned_abs(),
580 signature: signature.to_string(),
581 tx_idx,
582 })
583 }
584 fn from_oar(value: OrderActionRecord, signature: &str, tx_idx: usize) -> Option<Self> {
585 match value.action {
586 OrderAction::Cancel => {
587 if let OrderActionExplanation::OrderExpired = value.action_explanation {
588 Some(DriftEvent::OrderExpire {
590 fee: value.filler_reward.unwrap_or_default(),
591 order_id: value
592 .maker_order_id
593 .or(value.taker_order_id)
594 .expect("order id set"),
595 ts: value.ts.unsigned_abs(),
596 signature: signature.to_string(),
597 tx_idx,
598 user: value.maker.or(value.taker),
599 })
600 } else {
601 Some(DriftEvent::OrderCancel {
602 maker: value.maker,
603 taker: value.taker,
604 maker_order_id: value.maker_order_id.unwrap_or_default(),
605 taker_order_id: value.taker_order_id.unwrap_or_default(),
606 ts: value.ts.unsigned_abs(),
607 signature: signature.to_string(),
608 tx_idx,
609 })
610 }
611 }
612 OrderAction::Fill => Some(DriftEvent::OrderFill {
613 maker: value.maker,
614 maker_fee: value.maker_fee.unwrap_or_default(),
615 maker_order_id: value.maker_order_id.unwrap_or_default(),
616 maker_side: value.maker_order_direction,
617 taker: value.taker,
618 taker_fee: value.taker_fee.unwrap_or_default(),
619 taker_order_id: value.taker_order_id.unwrap_or_default(),
620 taker_side: value.taker_order_direction,
621 base_asset_amount_filled: value.base_asset_amount_filled.unwrap_or_default(),
622 quote_asset_amount_filled: value.quote_asset_amount_filled.unwrap_or_default(),
623 oracle_price: value.oracle_price,
624 market_index: value.market_index,
625 market_type: value.market_type,
626 ts: value.ts.unsigned_abs(),
627 signature: signature.to_string(),
628 tx_idx,
629 }),
630 OrderAction::Place | OrderAction::Expire | OrderAction::Trigger => None,
634 }
635 }
636}
637
638struct TxSignatureCache {
640 capacity: usize,
641 entries: HashSet<String>,
642 age: VecDeque<String>,
643}
644
645impl TxSignatureCache {
646 fn new(capacity: usize) -> Self {
647 Self {
648 capacity,
649 entries: HashSet::<String>::with_capacity_and_hasher(capacity, Default::default()),
650 age: VecDeque::with_capacity(capacity),
651 }
652 }
653 fn contains(&self, x: &str) -> bool {
654 self.entries.contains(x)
655 }
656 fn insert(&mut self, x: String) {
657 self.entries.insert(x.clone());
658 self.age.push_back(x);
659
660 if self.age.len() >= self.capacity {
661 if let Some(ref oldest) = self.age.pop_front() {
662 self.entries.remove(oldest);
663 }
664 }
665 }
666 #[cfg(test)]
667 fn reset(&mut self) {
668 self.entries.clear()
669 }
670}
671
672#[cfg(test)]
673mod test {
674 use ahash::HashMap;
675 use anchor_lang::prelude::*;
676 use base64::Engine;
677 use futures_util::future::ready;
678 use solana_sdk::{
679 hash::Hash,
680 instruction::{AccountMeta, Instruction},
681 message::{v0, VersionedMessage},
682 pubkey::Pubkey,
683 };
684 use solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta};
685 use tokio::sync::Mutex;
686
687 use super::*;
688 use crate::SdkError;
689
690 #[cfg(feature = "rpc_tests")]
691 #[tokio::test]
692 async fn event_streaming_logs() {
693 use crate::async_utils::retry_policy;
694 let mut event_stream = EventSubscriber::subscribe(
695 "wss://api.devnet.solana.com",
696 Pubkey::from_str("9JtczxrJjPM4J1xooxr2rFXmRivarb4BwjNiBgXDwe2p").unwrap(),
697 retry_policy::never(),
698 )
699 .await
700 .unwrap()
701 .take(5);
702
703 while let Some(event) = event_stream.next().await {
704 dbg!(event);
705 }
706 }
707
708 #[tokio::test]
709 async fn log_stream_handles_jit_proxy_events() {
710 let cache = TxSignatureCache::new(16);
711 let (event_tx, mut event_rx) = channel(16);
712
713 let mut log_stream = LogEventStream {
714 cache: Arc::new(cache.into()),
715 provider: Arc::new(
716 PubsubClient::new("wss://api.devnet.solana.com".into())
717 .await
718 .unwrap(),
719 ),
720 sub_account: "GgZkrSFgTAXZn1rNtZ533wpZi6nxx8whJC9bxRESB22c"
721 .try_into()
722 .unwrap(),
723 event_tx,
724 commitment: CommitmentConfig::confirmed(),
725 };
726
727 let logs: Vec<String> = [
728 "Program ComputeBudget111111111111111111111111111111 invoke [1]",
729 "Program ComputeBudget111111111111111111111111111111 success",
730 "Program J1TnP8zvVxbtF5KFp5xRmWuvG9McnhzmBd9XGfCyuxFP invoke [1]",
731 "Program log: Instruction: ArbPerp",
732 "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH invoke [2]",
733 "Program log: Instruction: PlaceAndTakePerpOrder",
734 "Program log: Invalid Spot 0 Oracle: Stale (oracle_delay=23)",
735 "Program log: 4DRDR8LtbQFOKvplAAAAAAAAGAABAAAAAAAAAAAAAAFGJn8TpIimFlKv8ZWRhmuU81x+ojkf3K4d+++MbslDfAGZcTYAAQEBAM5q/TIAAAABAAAAAAAAAAABAAAAAAAAAAAAAAAAAACTWxEAAAAAAAA=",
736 "Program log: aBNAOFkVAlpOKvplAAAAAEYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8qZQ2DwAAAABMTREAAAAAAADOav0yAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAJlxNgAYAAEBAQAAAQAAAQAAAAAA",
737 "Program log: 4DRDR8LtbQFOKvplAAAAAAIIGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAceaAwAAAAAAAQDOav0yAAAAAQQgzQ4AAAAAAQIjAQAAAAAAAQA+////////AAAAAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZlxNgABAQEAzmr9MgAAAAEAzmr9MgAAAAEEIM0OAAAAAAHpAf4sI0TDV0Ec0LWHs9mO40bjfKEm3A+yye5HFCQQQQEzPgAAAQABANraQssAAAABANraQssAAAABLJgAOwAAAACTWxEAAAAAAAA=",
738 "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH consumed 373815 of 1334075 compute units",
739 "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH success",
740 "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH invoke [2]",
741 "Program log: Instruction: PlaceAndTakePerpOrder",
742 "Program log: Invalid Spot 0 Oracle: Stale (oracle_delay=23)",
743 "Program log: 4DRDR8LtbQFOKvplAAAAAAAAGAABAAAAAAAAAAAAAAFGJn8TpIimFlKv8ZWRhmuU81x+ojkf3K4d+++MbslDfAGacTYAAQABAM5q/TIAAAABAAAAAAAAAAABAAAAAAAAAAAAAAAAAACTWxEAAAAAAAA=",
744 "Program log: aBNAOFkVAlpOKvplAAAAAEYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8qZQ2DwAAAACAPBEAAAAAAADOav0yAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAJpxNgAYAAEBAQABAAAAAQAAAAAA",
745 "Program log: 4DRDR8LtbQFOKvplAAAAAAIQGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAciaAwAAAAAAAQDgBS0LAAAAAQBYOwMAAAAAAYs/AAAAAAAAAAAB+Ejx//////8AAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZpxNgABAAEAzmr9MgAAAAEA4AUtCwAAAAEAWDsDAAAAAAAAAAAAAJNbEQAAAAAAAA==",
746 "Program log: 4DRDR8LtbQFOKvplAAAAAAIIGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAcmaAwAAAAAAAQDuZNAnAAAAAYBpgwsAAAAAAV3iAAAAAAAAARhp////////AAAAAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZpxNgABAAEAzmr9MgAAAAEAzmr9MgAAAAGAwb4OAAAAAAFmQRGN8PRJqt5D5pVvCspbc3f0ZBdTB1Kcw0YfuzxCOAH2/poHAQEBAIjmn+sAAAABAFrDjp4AAAABgPDZLQAAAACTWxEAAAAAAAA=",
747 "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH consumed 269624 of 934786 compute units",
748 "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH success",
749 "Program log: pnl 792986",
750 "Program J1TnP8zvVxbtF5KFp5xRmWuvG9McnhzmBd9XGfCyuxFP consumed 738458 of 1399850 compute units",
751 "Program J1TnP8zvVxbtF5KFp5xRmWuvG9McnhzmBd9XGfCyuxFP success",
752 ].into_iter().map(Into::into).collect();
753
754 log_stream.process_log(RpcLogsResponse {
755 signature: "2jLk34wWwgecuws9iD9Ug63JdL8kYBePdtcakzG34zEx9KYVYD6HuokxMZYpFw799cJZBcaCMZ47WAxkGJjM7zNC".into(),
756 err: None,
757 logs: logs.clone(),
758 }).await;
759
760 assert_eq!(
762 event_rx.try_recv().expect("one event"),
763 DriftEvent::OrderFill {
764 maker: Some(
765 "GgZkrSFgTAXZn1rNtZ533wpZi6nxx8whJC9bxRESB22c".try_into().unwrap(),
766 ),
767 maker_fee: -49664,
768 maker_order_id: 15923,
769 maker_side: Some(
770 PositionDirection::Long,
771 ),
772 taker: Some(
773 "5iqawn52cdBmsjC4hDegyFnX1iNRTNDV5mRsGzgqbuyD".try_into().unwrap(),
774 ),
775 taker_fee: 74498,
776 taker_order_id: 3568025,
777 taker_side: Some(
778 PositionDirection::Short,
779 ),
780 base_asset_amount_filled: 219000000000,
781 quote_asset_amount_filled: 248324100,
782 market_index: 24,
783 market_type: MarketType::Perp,
784 oracle_price: 1137555,
785 signature: "2jLk34wWwgecuws9iD9Ug63JdL8kYBePdtcakzG34zEx9KYVYD6HuokxMZYpFw799cJZBcaCMZ47WAxkGJjM7zNC".into(),
786 tx_idx: 9,
787 ts: 1710893646,
788 }
789 );
790 assert!(event_rx.try_recv().is_err()); log_stream.sub_account = "5iqawn52cdBmsjC4hDegyFnX1iNRTNDV5mRsGzgqbuyD"
795 .try_into()
796 .unwrap();
797 log_stream.cache.write().await.reset();
798
799 log_stream.process_log(RpcLogsResponse {
800 signature: "2jLk34wWwgecuws9iD9Ug63JdL8kYBePdtcakzG34zEx9KYVYD6HuokxMZYpFw799cJZBcaCMZ47WAxkGJjM7zNC".into(),
801 err: None,
802 logs: logs.clone(),
803 }).await;
804
805 assert!(event_rx.try_recv().is_ok()); assert!(event_rx.try_recv().is_ok()); assert!(event_rx.try_recv().is_ok()); assert!(event_rx.try_recv().is_ok()); assert!(event_rx.try_recv().is_ok()); assert!(event_rx.try_recv().is_err()); }
812
813 #[test]
814 fn test_log() {
815 let result = try_parse_log("Program log: 4DRDR8LtbQH+x7JlAAAAAAIIAAABAbpHl8YM/aWjrjfQ48x0R2DclPigyXtYx+5d/vSVjUIZAQoCAAAAAAAAAaJhIgAAAAAAAQDC6wsAAAAAAZjQCQEAAAAAAWsUAAAAAAAAAWTy////////AAAAAaNzGgMga9TnxjVkycO4bmqSGjK6kP92OrKdZMYqFV+aAS4eKQ4BAQEAHkHaNAAAAAEAwusLAAAAAAGY0AkBAAAAAAFneQwBwHPUIY9ykEdbxsTV7Lh6K+vISfq8nLCTm/rWoAHwCQAAAQABAMLrCwAAAAABAMLrCwAAAAABmNAJAQAAAAA9Zy8FAAAAAAA=", "sig", 0);
816 dbg!(result);
817 }
818
819 #[test]
820 fn parses_jit_proxy_logs() {
821 let cpi_logs = &[
822 "Program log: 4DRDR8LtbQFOKvplAAAAAAAAGAABAAAAAAAAAAAAAAFGJn8TpIimFlKv8ZWRhmuU81x+ojkf3K4d+++MbslDfAGZcTYAAQEBAM5q/TIAAAABAAAAAAAAAAABAAAAAAAAAAAAAAAAAACTWxEAAAAAAAAA",
823 "Program log: aBNAOFkVAlpOKvplAAAAAEYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8qZQ2DwAAAABMTREAAAAAAADOav0yAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAJlxNgAYAAEBAQAAAQAAAQAAAAAA",
824 "Program log: 4DRDR8LtbQFOKvplAAAAAAIIGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAceaAwAAAAAAAQDOav0yAAAAAQQgzQ4AAAAAAQIjAQAAAAAAAQA+////////AAAAAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZlxNgABAQEAzmr9MgAAAAEAzmr9MgAAAAEEIM0OAAAAAAHpAf4sI0TDV0Ec0LWHs9mO40bjfKEm3A+yye5HFCQQQQEzPgAAAQABANraQssAAAABANraQssAAAABLJgAOwAAAACTWxEAAAAAAAA=",
825 "Program log: 4DRDR8LtbQFOKvplAAAAAAAAGAABAAAAAAAAAAAAAAFGJn8TpIimFlKv8ZWRhmuU81x+ojkf3K4d+++MbslDfAGacTYAAQABAM5q/TIAAAABAAAAAAAAAAABAAAAAAAAAAAAAAAAAACTWxEAAAAAAAA=",
826 "Program log: aBNAOFkVAlpOKvplAAAAAEYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8qZQ2DwAAAACAPBEAAAAAAADOav0yAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAJpxNgAYAAEBAQABAAAAAQAAAAAA",
827 "Program log: 4DRDR8LtbQFOKvplAAAAAAIQGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAciaAwAAAAAAAQDgBS0LAAAAAQBYOwMAAAAAAYs/AAAAAAAAAAAB+Ejx//////8AAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZpxNgABAAEAzmr9MgAAAAEA4AUtCwAAAAEAWDsDAAAAAAAAAAAAAJNbEQAAAAAAAA==",
828 "Program log: 4DRDR8LtbQFOKvplAAAAAAIIGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAcmaAwAAAAAAAQDuZNAnAAAAAYBpgwsAAAAAAV3iAAAAAAAAARhp////////AAAAAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZpxNgABAAEAzmr9MgAAAAEAzmr9MgAAAAGAwb4OAAAAAAFmQRGN8PRJqt5D5pVvCspbc3f0ZBdTB1Kcw0YfuzxCOAH2/poHAQEBAIjmn+sAAAABAFrDjp4AAAABgPDZLQAAAACTWxEAAAAAAAA=",
829 ];
830
831 for log in cpi_logs {
832 let result = try_parse_log(log, "sig", 0);
833 dbg!(log, result);
834 }
835 }
836
837 #[tokio::test]
838 async fn polled_event_stream_caching() {
839 let _ = env_logger::try_init();
840 struct MockRpcProvider {
841 tx_responses: HashMap<String, EncodedTransactionWithStatusMeta>,
842 signatures: tokio::sync::Mutex<Vec<String>>,
843 }
844
845 impl MockRpcProvider {
846 async fn add_signatures(&self, signatures: Vec<String>) {
847 let mut all_signatures = self.signatures.lock().await;
848 all_signatures.extend(signatures.into_iter());
849 }
850 }
851
852 impl EventRpcProvider for Arc<MockRpcProvider> {
853 fn get_tx(
854 &self,
855 signature: Signature,
856 ) -> BoxFuture<SdkResult<EncodedTransactionWithStatusMeta>> {
857 ready(
858 self.tx_responses
859 .get(signature.to_string().as_str())
860 .ok_or(SdkError::Deserializing)
861 .cloned(),
862 )
863 .boxed()
864 }
865 fn get_tx_signatures(
866 &self,
867 _account: Pubkey,
868 after: Option<Signature>,
869 _limit: Option<usize>,
870 ) -> BoxFuture<SdkResult<Vec<String>>> {
871 async move {
872 let after = after.map(|s| s.to_string());
873 let mut self_signatures = self.signatures.lock().await;
874 if after.is_none() {
875 return Ok(self_signatures.clone());
876 }
877
878 if let Some(idx) = self_signatures
879 .iter()
880 .position(|s| Some(s) == after.as_ref())
881 {
882 if idx > 0 {
883 *self_signatures = self_signatures[..idx].to_vec();
885 } else {
886 self_signatures.clear();
887 }
888 }
889
890 Ok(self_signatures.clone())
891 }
892 .boxed()
893 }
894 }
895
896 let (event_tx, mut event_rx) = channel(16);
897 let sub_account = Pubkey::new_unique();
898 let cache = Arc::new(RwLock::new(TxSignatureCache::new(16)));
899
900 let mut order_events: Vec<(OrderActionRecord, OrderRecord)> = (0..5)
901 .map(|id| {
902 (
903 get_order_action_record(
904 id as i64,
905 OrderAction::Place,
906 OrderActionExplanation::None,
907 0,
908 None,
909 None,
910 None,
911 None,
912 None,
913 None,
914 None,
915 None,
916 None,
917 None,
918 None,
919 None,
920 Some(sub_account.clone()),
921 Some(Order {
922 order_id: id,
923 ..Default::default()
924 }),
925 0,
926 0,
927 ),
928 OrderRecord {
929 ts: id as i64,
930 user: sub_account,
931 order: Order {
932 order_id: id,
933 ..Default::default()
934 },
935 },
936 )
937 })
938 .collect();
939 let signatures: Vec<String> = (0..order_events.len())
940 .map(|_| Signature::new_unique().to_string())
941 .collect();
942 let mut tx_responses = HashMap::<String, EncodedTransactionWithStatusMeta>::default();
943 for s in signatures.iter() {
944 let (oar, or) = order_events.pop().unwrap();
945 tx_responses.insert(
946 s.clone(),
947 make_transaction(
948 sub_account,
949 Signature::from_str(s).unwrap(),
950 Some(vec![
951 format!("{PROGRAM_LOG}{}", serialize_event(oar)),
952 format!("{PROGRAM_LOG}{}", serialize_event(or),),
953 ]),
954 ),
955 );
956 }
957
958 let mock_rpc_provider = Arc::new(MockRpcProvider {
959 tx_responses,
960 signatures: Mutex::new(vec![signatures.first().unwrap().clone()]),
961 });
962
963 tokio::spawn(
964 PolledEventStream {
965 cache: Arc::clone(&cache),
966 provider: Arc::clone(&mock_rpc_provider),
967 sub_account,
968 event_tx,
969 }
970 .stream_fn(),
971 );
972 tokio::time::sleep(Duration::from_secs(1)).await;
973
974 {
979 let mut cache_ = cache.write().await;
980 cache_.insert(signatures[1].clone());
981 cache_.insert(signatures[4].clone());
982 }
983 mock_rpc_provider
984 .add_signatures(signatures[1..].to_vec())
985 .await;
986 tokio::time::sleep(Duration::from_secs(1)).await;
987
988 assert!(event_rx.recv().await.is_some_and(|f| {
989 if let DriftEvent::OrderCreate { order, .. } = f {
990 println!("{}", order.order_id);
991 order.order_id == 1
992 } else {
993 false
994 }
995 }));
996 assert!(event_rx.recv().await.is_some_and(|f| {
997 if let DriftEvent::OrderCreate { order, .. } = f {
998 println!("{}", order.order_id);
999 order.order_id == 2
1000 } else {
1001 false
1002 }
1003 }));
1004 tokio::time::sleep(Duration::from_secs(1)).await;
1005 assert!(event_rx.try_recv().is_err());
1006 }
1007
1008 fn make_transaction(
1010 account: Pubkey,
1011 signature: Signature,
1012 logs: Option<Vec<String>>,
1013 ) -> EncodedTransactionWithStatusMeta {
1014 let mut meta = TransactionStatusMeta::default();
1015 meta.log_messages = logs;
1016 VersionedTransactionWithStatusMeta {
1017 transaction: VersionedTransaction {
1018 signatures: vec![signature],
1019 message: VersionedMessage::V0(
1020 v0::Message::try_compile(
1021 &account,
1022 &[Instruction {
1023 program_id: constants::PROGRAM_ID,
1024 accounts: vec![AccountMeta::new_readonly(constants::PROGRAM_ID, true)],
1025 data: Default::default(),
1026 }],
1027 &[],
1028 Hash::new_unique(),
1029 )
1030 .expect("v0 message"),
1031 ),
1032 },
1033 meta,
1034 }
1035 .encode(UiTransactionEncoding::Base64, Some(0), false)
1036 .unwrap()
1037 }
1038
1039 pub fn serialize_event<T: AnchorSerialize + Discriminator>(event: T) -> String {
1041 let mut data_buf = T::DISCRIMINATOR.to_vec();
1042 event.serialize(&mut data_buf).expect("serializes");
1043 base64::engine::general_purpose::STANDARD.encode(data_buf)
1044 }
1045
1046 pub fn get_order_action_record(
1047 ts: i64,
1048 action: OrderAction,
1049 action_explanation: OrderActionExplanation,
1050 market_index: u16,
1051 filler: Option<Pubkey>,
1052 fill_record_id: Option<u64>,
1053 filler_reward: Option<u64>,
1054 base_asset_amount_filled: Option<u64>,
1055 quote_asset_amount_filled: Option<u64>,
1056 taker_fee: Option<u64>,
1057 maker_rebate: Option<u64>,
1058 referrer_reward: Option<u64>,
1059 quote_asset_amount_surplus: Option<i64>,
1060 spot_fulfillment_method_fee: Option<u64>,
1061 taker: Option<Pubkey>,
1062 taker_order: Option<Order>,
1063 maker: Option<Pubkey>,
1064 maker_order: Option<Order>,
1065 oracle_price: i64,
1066 bit_flags: u8,
1067 ) -> OrderActionRecord {
1068 OrderActionRecord {
1069 bit_flags,
1070 ts,
1071 action,
1072 action_explanation,
1073 market_index,
1074 market_type: if let Some(taker_order) = taker_order {
1075 taker_order.market_type
1076 } else if let Some(maker_order) = maker_order {
1077 maker_order.market_type
1078 } else {
1079 panic!("inalid order");
1080 },
1081 filler,
1082 filler_reward,
1083 fill_record_id,
1084 base_asset_amount_filled,
1085 quote_asset_amount_filled,
1086 taker_fee,
1087 maker_fee: match maker_rebate {
1088 Some(maker_rebate) => Some(maker_rebate as i64),
1089 None => None,
1090 },
1091 referrer_reward: match referrer_reward {
1092 Some(referrer_reward) if referrer_reward > 0 => {
1093 Some(referrer_reward.try_into().unwrap())
1094 }
1095 _ => None,
1096 },
1097 quote_asset_amount_surplus,
1098 spot_fulfillment_method_fee,
1099 taker,
1100 taker_order_id: taker_order.map(|order| order.order_id),
1101 taker_order_direction: taker_order.map(|order| order.direction),
1102 taker_order_base_asset_amount: taker_order.map(|order| order.base_asset_amount),
1103 taker_order_cumulative_base_asset_amount_filled: taker_order
1104 .map(|order| order.base_asset_amount_filled),
1105 taker_order_cumulative_quote_asset_amount_filled: taker_order
1106 .as_ref()
1107 .map(|order| order.quote_asset_amount_filled),
1108 maker,
1109 maker_order_id: maker_order.map(|order| order.order_id),
1110 maker_order_direction: maker_order.map(|order| order.direction),
1111 maker_order_base_asset_amount: maker_order.map(|order| order.base_asset_amount),
1112 maker_order_cumulative_base_asset_amount_filled: maker_order
1113 .map(|order| order.base_asset_amount_filled),
1114 maker_order_cumulative_quote_asset_amount_filled: maker_order
1115 .map(|order| order.quote_asset_amount_filled),
1116 oracle_price,
1117 }
1118 }
1119}