drift_rs/
event_subscriber.rs

1use std::{
2    collections::VecDeque,
3    str::FromStr,
4    sync::{Arc, OnceLock},
5    task::{Context, Poll},
6    time::Duration,
7};
8
9use ahash::HashSet;
10use anchor_lang::{AnchorDeserialize, Discriminator};
11use base64::Engine;
12pub use drift_pubsub_client::PubsubClient;
13use futures_util::{future::BoxFuture, stream::FuturesOrdered, FutureExt, Stream, StreamExt};
14use log::{debug, info, warn};
15use regex::Regex;
16pub use solana_rpc_client::nonblocking::rpc_client::RpcClient;
17use solana_rpc_client::rpc_client::GetConfirmedSignaturesForAddress2Config;
18use solana_rpc_client_api::{
19    config::{RpcTransactionConfig, RpcTransactionLogsConfig, RpcTransactionLogsFilter},
20    response::RpcLogsResponse,
21};
22pub use solana_sdk::commitment_config::CommitmentConfig;
23use solana_sdk::{pubkey::Pubkey, signature::Signature, transaction::VersionedTransaction};
24use solana_transaction_status::{
25    option_serializer::OptionSerializer, EncodedTransactionWithStatusMeta, UiTransactionEncoding,
26};
27use tokio::{
28    sync::{
29        mpsc::{channel, Receiver, Sender},
30        RwLock,
31    },
32    task::JoinHandle,
33};
34
35use crate::{
36    constants,
37    drift_idl::{
38        events::{FundingPaymentRecord, OrderActionRecord, OrderRecord},
39        types::{MarketType, Order, OrderAction, OrderActionExplanation, PositionDirection},
40    },
41    types::SdkResult,
42};
43
44const LOG_TARGET: &str = "events";
45const EMPTY_SIGNATURE: &str = "1111111111111111111111111111111111111111111111111111111111111111";
46
47impl EventRpcProvider for RpcClient {
48    fn get_tx(
49        &self,
50        signature: Signature,
51    ) -> BoxFuture<SdkResult<EncodedTransactionWithStatusMeta>> {
52        async move {
53            let result = self
54                .get_transaction_with_config(
55                    &signature,
56                    RpcTransactionConfig {
57                        encoding: Some(UiTransactionEncoding::Base64),
58                        max_supported_transaction_version: Some(0),
59                        ..Default::default()
60                    },
61                )
62                .await?;
63
64            Ok(result.transaction)
65        }
66        .boxed()
67    }
68    fn get_tx_signatures(
69        &self,
70        account: Pubkey,
71        after: Option<Signature>,
72        limit: Option<usize>,
73    ) -> BoxFuture<SdkResult<Vec<String>>> {
74        async move {
75            let results = self
76                .get_signatures_for_address_with_config(
77                    &account,
78                    GetConfirmedSignaturesForAddress2Config {
79                        until: after,
80                        limit,
81                        ..Default::default()
82                    },
83                )
84                .await?;
85
86            Ok(results.iter().map(|r| r.signature.clone()).collect())
87        }
88        .boxed()
89    }
90}
91
92/// RPC functions required for drift event subscriptions
93pub trait EventRpcProvider: Send + Sync + 'static {
94    /// Fetch tx signatures of account
95    /// `after` only return txs more recent than this signature, if given
96    /// `limit` return at most this many signatures, if given
97    fn get_tx_signatures(
98        &self,
99        account: Pubkey,
100        after: Option<Signature>,
101        limit: Option<usize>,
102    ) -> BoxFuture<SdkResult<Vec<String>>>;
103    /// Fetch tx with `signature`
104    fn get_tx(
105        &self,
106        signature: Signature,
107    ) -> BoxFuture<SdkResult<EncodedTransactionWithStatusMeta>>;
108}
109
110/// Provides sub-account event streaming
111pub struct EventSubscriber;
112
113impl EventSubscriber {
114    /// Subscribe to drift events of `sub_account`, backed by Ws APIs
115    ///
116    /// * `sub_account` - pubkey of the user's sub-account to subscribe to
117    ///
118    /// Returns a stream of events
119    pub async fn subscribe(
120        ws: Arc<PubsubClient>,
121        sub_account: Pubkey,
122    ) -> SdkResult<DriftEventStream> {
123        log_stream(ws, sub_account).await
124    }
125    /// Subscribe to drift events of `sub_account`, backed by RPC polling APIs
126    pub fn subscribe_polled(provider: impl EventRpcProvider, account: Pubkey) -> DriftEventStream {
127        polled_stream(provider, account)
128    }
129}
130
131struct LogEventStream {
132    cache: Arc<RwLock<TxSignatureCache>>,
133    provider: Arc<PubsubClient>,
134    sub_account: Pubkey,
135    event_tx: Sender<DriftEvent>,
136    commitment: CommitmentConfig,
137}
138
139impl LogEventStream {
140    /// Returns a future for running the configured log event stream
141    async fn stream_fn(self) {
142        let sub_account = self.sub_account;
143        info!(target: LOG_TARGET, "log stream connecting: {sub_account:?}");
144
145        let subscribe_result = self
146            .provider
147            .logs_subscribe(
148                RpcTransactionLogsFilter::Mentions(vec![self.sub_account.to_string()]),
149                RpcTransactionLogsConfig {
150                    commitment: Some(self.commitment),
151                },
152            )
153            .await;
154
155        if let Err(ref err) = subscribe_result {
156            warn!(target: LOG_TARGET, "log subscription failed for: {sub_account:?}. {err:?}");
157            return;
158        }
159
160        let (mut log_stream, _unsub_fn) = subscribe_result.unwrap();
161        debug!(target: LOG_TARGET, "start log subscription: {sub_account:?}");
162
163        while let Some(response) = log_stream.next().await {
164            self.process_log(response.value).await;
165        }
166        warn!(target: LOG_TARGET, "log stream ended: {sub_account:?}");
167    }
168
169    /// Process a log response from RPC, emitting any relevant events
170    async fn process_log(&self, response: RpcLogsResponse) {
171        let signature = response.signature;
172        if response.err.is_some() {
173            debug!(target: LOG_TARGET, "skipping failed tx: {signature:?}");
174            return;
175        }
176        if signature == EMPTY_SIGNATURE {
177            debug!(target: LOG_TARGET, "skipping empty signature");
178            return;
179        }
180        {
181            let mut cache = self.cache.write().await;
182            if cache.contains(&signature) {
183                debug!(target: LOG_TARGET, "skipping cached tx: {signature:?}");
184                return;
185            }
186            cache.insert(signature.clone());
187        }
188
189        debug!(target: LOG_TARGET, "log extracting events, tx: {signature:?}");
190        for (tx_idx, log) in response.logs.iter().enumerate() {
191            // a drift sub-account should not interact with any other program by definition
192            if let Some(event) = try_parse_log(log.as_str(), &signature, tx_idx) {
193                // unrelated events from same tx should not be emitted e.g. a filler tx which produces other fill events
194                if event.pertains_to(self.sub_account) {
195                    if self.event_tx.send(event).await.is_err() {
196                        warn!("event receiver closed");
197                        return;
198                    }
199                }
200            }
201        }
202    }
203}
204
205/// Creates a poll-ed stream using JSON-RPC interfaces
206fn polled_stream(provider: impl EventRpcProvider, sub_account: Pubkey) -> DriftEventStream {
207    let (event_tx, event_rx) = channel(256);
208    let cache = Arc::new(RwLock::new(TxSignatureCache::new(128)));
209    let join_handle = tokio::spawn(
210        PolledEventStream {
211            cache: Arc::clone(&cache),
212            provider,
213            sub_account,
214            event_tx,
215        }
216        .stream_fn(),
217    );
218
219    DriftEventStream {
220        rx: event_rx,
221        task: join_handle,
222    }
223}
224
225/// Creates a Ws-backed event stream using `logsSubscribe` interface
226async fn log_stream(ws: Arc<PubsubClient>, sub_account: Pubkey) -> SdkResult<DriftEventStream> {
227    debug!(target: LOG_TARGET, "stream events for {sub_account:?}");
228    let (event_tx, event_rx) = channel(256);
229    let cache = Arc::new(RwLock::new(TxSignatureCache::new(256)));
230
231    // spawn the event subscription task
232    let join_handle = tokio::spawn(async move {
233        LogEventStream {
234            provider: ws,
235            cache: Arc::clone(&cache),
236            sub_account,
237            event_tx: event_tx.clone(),
238            commitment: CommitmentConfig::confirmed(),
239        }
240        .stream_fn()
241        .await;
242    });
243
244    Ok(DriftEventStream {
245        rx: event_rx,
246        task: join_handle,
247    })
248}
249
250pub struct PolledEventStream<T: EventRpcProvider> {
251    cache: Arc<RwLock<TxSignatureCache>>,
252    event_tx: Sender<DriftEvent>,
253    provider: T,
254    sub_account: Pubkey,
255}
256
257impl<T: EventRpcProvider> PolledEventStream<T> {
258    async fn stream_fn(self) {
259        debug!(target: LOG_TARGET, "poll events for {:?}", self.sub_account);
260        // poll for events in any tx after this tx
261        // initially fetch the most recent tx from account
262        debug!(target: LOG_TARGET, "fetch initial txs");
263        let res = self
264            .provider
265            .get_tx_signatures(self.sub_account, None, Some(1))
266            .await;
267        debug!(target: LOG_TARGET, "fetched initial txs");
268
269        let mut last_seen_tx = res.expect("fetched tx").first().cloned();
270        let provider_ref = &self.provider;
271        'outer: loop {
272            // don't needlessly spam the RPC or hog the executor
273            tokio::time::sleep(Duration::from_millis(400)).await;
274
275            debug!(target: LOG_TARGET, "poll txs for events");
276            let signatures = provider_ref
277                .get_tx_signatures(
278                    self.sub_account,
279                    last_seen_tx
280                        .clone()
281                        .map(|s| Signature::from_str(&s).unwrap()),
282                    None,
283                )
284                .await;
285
286            if let Err(err) = signatures {
287                warn!(target: LOG_TARGET, "poll tx signatures: {err:?}");
288                continue;
289            }
290
291            let signatures = signatures.unwrap();
292            // txs from RPC are ordered newest to oldest
293            // process in reverse order, so subscribers receive events in chronological order
294            let mut futs = {
295                FuturesOrdered::from_iter(
296                    signatures
297                        .into_iter()
298                        .map(|s| async move {
299                            (
300                                s.clone(),
301                                provider_ref
302                                    .get_tx(
303                                        Signature::from_str(s.as_str()).expect("valid signature"),
304                                    )
305                                    .await,
306                            )
307                        })
308                        .rev(),
309                )
310            };
311            if futs.is_empty() {
312                continue;
313            }
314
315            while let Some((signature, response)) = futs.next().await {
316                debug!(target: LOG_TARGET, "poll extracting events, tx: {signature:?}");
317                if let Err(err) = response {
318                    warn!(target: LOG_TARGET, "poll processing tx: {err:?}");
319                    // retry querying the batch
320                    continue 'outer;
321                }
322
323                last_seen_tx = Some(signature.clone());
324                {
325                    let mut cache = self.cache.write().await;
326                    if cache.contains(&signature) {
327                        debug!(target: LOG_TARGET, "poll skipping cached tx: {signature:?}");
328                        continue;
329                    }
330                    cache.insert(signature.clone());
331                }
332
333                let EncodedTransactionWithStatusMeta {
334                    meta, transaction, ..
335                } = response.unwrap();
336                if meta.is_none() {
337                    continue;
338                }
339                let meta = meta.unwrap();
340
341                if let Some(VersionedTransaction { message, .. }) = transaction.decode() {
342                    // only txs interacting with drift program
343                    if !message
344                        .static_account_keys()
345                        .iter()
346                        .any(|k| k == &constants::PROGRAM_ID)
347                    {
348                        continue;
349                    }
350                }
351                // ignore failed txs
352                if meta.err.is_some() {
353                    continue;
354                }
355
356                if let OptionSerializer::Some(logs) = meta.log_messages {
357                    for (tx_idx, log) in logs.iter().enumerate() {
358                        if let Some(event) = try_parse_log(log.as_str(), signature.as_str(), tx_idx)
359                        {
360                            if event.pertains_to(self.sub_account) {
361                                self.event_tx.try_send(event).expect("sent");
362                            }
363                        }
364                    }
365                }
366            }
367        }
368    }
369}
370
371/// Provides a stream API of drift sub-account events
372pub struct DriftEventStream {
373    /// handle to end the stream task
374    task: JoinHandle<()>,
375    /// channel of events from stream task
376    rx: Receiver<DriftEvent>,
377}
378
379impl DriftEventStream {
380    /// End the event stream
381    pub fn unsubscribe(&self) {
382        self.task.abort();
383    }
384}
385
386impl Drop for DriftEventStream {
387    fn drop(&mut self) {
388        self.unsubscribe()
389    }
390}
391
392impl Stream for DriftEventStream {
393    type Item = DriftEvent;
394    fn poll_next(
395        mut self: std::pin::Pin<&mut Self>,
396        cx: &mut Context<'_>,
397    ) -> Poll<Option<Self::Item>> {
398        self.as_mut().rx.poll_recv(cx)
399    }
400}
401
402const PROGRAM_LOG: &str = "Program log: ";
403const PROGRAM_DATA: &str = "Program data: ";
404
405/// Try deserialize a drift event type from raw log string
406/// https://github.com/coral-xyz/anchor/blob/9d947cb26b693e85e1fd26072bb046ff8f95bdcf/client/src/lib.rs#L552
407pub fn try_parse_log(raw: &str, signature: &str, tx_idx: usize) -> Option<DriftEvent> {
408    // Log emitted from the current program.
409    if let Some(log) = raw
410        .strip_prefix(PROGRAM_LOG)
411        .or_else(|| raw.strip_prefix(PROGRAM_DATA))
412    {
413        if let Ok(borsh_bytes) = base64::engine::general_purpose::STANDARD.decode(log) {
414            let (disc, mut data) = borsh_bytes.split_at(8);
415            let disc: [u8; 8] = disc.try_into().unwrap();
416
417            return DriftEvent::from_discriminant(disc, &mut data, signature, tx_idx);
418        }
419
420        // experimental
421        let order_cancel_missing_re = ORDER_CANCEL_MISSING_RE
422            .get_or_init(|| Regex::new(r"could not find( user){0,1} order id (\d+)").unwrap());
423        if let Some(captures) = order_cancel_missing_re.captures(log) {
424            let order_id = captures
425                .get(2)
426                .unwrap()
427                .as_str()
428                .parse::<u32>()
429                .expect("<u32");
430            let event = if captures.get(1).is_some() {
431                // cancel by user order Id
432                DriftEvent::OrderCancelMissing {
433                    user_order_id: order_id as u8,
434                    order_id: 0,
435                    signature: signature.to_string(),
436                }
437            } else {
438                // cancel by order id
439                DriftEvent::OrderCancelMissing {
440                    user_order_id: 0,
441                    order_id,
442                    signature: signature.to_string(),
443                }
444            };
445
446            return Some(event);
447        }
448    }
449
450    None
451}
452
453static ORDER_CANCEL_MISSING_RE: OnceLock<Regex> = OnceLock::new();
454
455/// Enum of all drift program events
456#[derive(Debug, PartialEq)]
457pub enum DriftEvent {
458    OrderFill {
459        maker: Option<Pubkey>,
460        maker_fee: i64,
461        maker_order_id: u32,
462        maker_side: Option<PositionDirection>,
463        taker: Option<Pubkey>,
464        taker_fee: u64,
465        taker_order_id: u32,
466        taker_side: Option<PositionDirection>,
467        base_asset_amount_filled: u64,
468        quote_asset_amount_filled: u64,
469        market_index: u16,
470        market_type: MarketType,
471        oracle_price: i64,
472        signature: String,
473        tx_idx: usize,
474        ts: u64,
475    },
476    OrderCancel {
477        taker: Option<Pubkey>,
478        maker: Option<Pubkey>,
479        taker_order_id: u32,
480        maker_order_id: u32,
481        signature: String,
482        tx_idx: usize,
483        ts: u64,
484    },
485    /// An order cancel for a missing order Id / user order id
486    OrderCancelMissing {
487        user_order_id: u8,
488        order_id: u32,
489        signature: String,
490    },
491    OrderCreate {
492        order: Order,
493        user: Pubkey,
494        ts: u64,
495        signature: String,
496        tx_idx: usize,
497    },
498    // sub-case of cancel?
499    OrderExpire {
500        order_id: u32,
501        user: Option<Pubkey>,
502        fee: u64,
503        ts: u64,
504        signature: String,
505        tx_idx: usize,
506    },
507    FundingPayment {
508        amount: i64,
509        market_index: u16,
510        user: Pubkey,
511        ts: u64,
512        signature: String,
513        tx_idx: usize,
514    },
515}
516
517impl DriftEvent {
518    /// Return true if the event is connected to sub-account
519    fn pertains_to(&self, sub_account: Pubkey) -> bool {
520        let subject = &Some(sub_account);
521        match self {
522            Self::OrderCancel { maker, taker, .. } | Self::OrderFill { maker, taker, .. } => {
523                maker == subject || taker == subject
524            }
525            Self::OrderCreate { user, .. } => *user == sub_account,
526            Self::OrderExpire { user, .. } => user == subject,
527            Self::OrderCancelMissing { .. } => true,
528            Self::FundingPayment { user, .. } => *user == sub_account,
529        }
530    }
531    /// Deserialize drift event by discriminant
532    fn from_discriminant(
533        disc: [u8; 8],
534        data: &mut &[u8],
535        signature: &str,
536        tx_idx: usize,
537    ) -> Option<Self> {
538        match disc.as_slice() {
539            // deser should only fail on a breaking protocol changes
540            OrderActionRecord::DISCRIMINATOR => Self::from_oar(
541                OrderActionRecord::deserialize(data).expect("deserializes"),
542                signature,
543                tx_idx,
544            ),
545            OrderRecord::DISCRIMINATOR => Self::from_order_record(
546                OrderRecord::deserialize(data).expect("deserializes"),
547                signature,
548                tx_idx,
549            ),
550            FundingPaymentRecord::DISCRIMINATOR => Some(Self::from_funding_payment_record(
551                FundingPaymentRecord::deserialize(data).expect("deserializes"),
552                signature,
553                tx_idx,
554            )),
555            _ => {
556                debug!(target: LOG_TARGET, "unhandled event: {disc:?}");
557                None
558            }
559        }
560    }
561    fn from_funding_payment_record(
562        value: FundingPaymentRecord,
563        signature: &str,
564        tx_idx: usize,
565    ) -> Self {
566        Self::FundingPayment {
567            amount: value.funding_payment,
568            market_index: value.market_index,
569            ts: value.ts.unsigned_abs(),
570            user: value.user,
571            signature: signature.to_string(),
572            tx_idx,
573        }
574    }
575    fn from_order_record(value: OrderRecord, signature: &str, tx_idx: usize) -> Option<Self> {
576        Some(DriftEvent::OrderCreate {
577            order: value.order,
578            user: value.user,
579            ts: value.ts.unsigned_abs(),
580            signature: signature.to_string(),
581            tx_idx,
582        })
583    }
584    fn from_oar(value: OrderActionRecord, signature: &str, tx_idx: usize) -> Option<Self> {
585        match value.action {
586            OrderAction::Cancel => {
587                if let OrderActionExplanation::OrderExpired = value.action_explanation {
588                    // TODO: would be nice to report the `user_order_id` too...
589                    Some(DriftEvent::OrderExpire {
590                        fee: value.filler_reward.unwrap_or_default(),
591                        order_id: value
592                            .maker_order_id
593                            .or(value.taker_order_id)
594                            .expect("order id set"),
595                        ts: value.ts.unsigned_abs(),
596                        signature: signature.to_string(),
597                        tx_idx,
598                        user: value.maker.or(value.taker),
599                    })
600                } else {
601                    Some(DriftEvent::OrderCancel {
602                        maker: value.maker,
603                        taker: value.taker,
604                        maker_order_id: value.maker_order_id.unwrap_or_default(),
605                        taker_order_id: value.taker_order_id.unwrap_or_default(),
606                        ts: value.ts.unsigned_abs(),
607                        signature: signature.to_string(),
608                        tx_idx,
609                    })
610                }
611            }
612            OrderAction::Fill => Some(DriftEvent::OrderFill {
613                maker: value.maker,
614                maker_fee: value.maker_fee.unwrap_or_default(),
615                maker_order_id: value.maker_order_id.unwrap_or_default(),
616                maker_side: value.maker_order_direction,
617                taker: value.taker,
618                taker_fee: value.taker_fee.unwrap_or_default(),
619                taker_order_id: value.taker_order_id.unwrap_or_default(),
620                taker_side: value.taker_order_direction,
621                base_asset_amount_filled: value.base_asset_amount_filled.unwrap_or_default(),
622                quote_asset_amount_filled: value.quote_asset_amount_filled.unwrap_or_default(),
623                oracle_price: value.oracle_price,
624                market_index: value.market_index,
625                market_type: value.market_type,
626                ts: value.ts.unsigned_abs(),
627                signature: signature.to_string(),
628                tx_idx,
629            }),
630            // Place - parsed from `OrderRecord` event, ignored here due to lack of useful info
631            // Expire - never emitted
632            // Trigger - unimplemented
633            OrderAction::Place | OrderAction::Expire | OrderAction::Trigger => None,
634        }
635    }
636}
637
638/// fixed capacity cache of tx signatures
639struct TxSignatureCache {
640    capacity: usize,
641    entries: HashSet<String>,
642    age: VecDeque<String>,
643}
644
645impl TxSignatureCache {
646    fn new(capacity: usize) -> Self {
647        Self {
648            capacity,
649            entries: HashSet::<String>::with_capacity_and_hasher(capacity, Default::default()),
650            age: VecDeque::with_capacity(capacity),
651        }
652    }
653    fn contains(&self, x: &str) -> bool {
654        self.entries.contains(x)
655    }
656    fn insert(&mut self, x: String) {
657        self.entries.insert(x.clone());
658        self.age.push_back(x);
659
660        if self.age.len() >= self.capacity {
661            if let Some(ref oldest) = self.age.pop_front() {
662                self.entries.remove(oldest);
663            }
664        }
665    }
666    #[cfg(test)]
667    fn reset(&mut self) {
668        self.entries.clear()
669    }
670}
671
672#[cfg(test)]
673mod test {
674    use ahash::HashMap;
675    use anchor_lang::prelude::*;
676    use base64::Engine;
677    use futures_util::future::ready;
678    use solana_sdk::{
679        hash::Hash,
680        instruction::{AccountMeta, Instruction},
681        message::{v0, VersionedMessage},
682        pubkey::Pubkey,
683    };
684    use solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta};
685    use tokio::sync::Mutex;
686
687    use super::*;
688    use crate::SdkError;
689
690    #[cfg(feature = "rpc_tests")]
691    #[tokio::test]
692    async fn event_streaming_logs() {
693        use crate::async_utils::retry_policy;
694        let mut event_stream = EventSubscriber::subscribe(
695            "wss://api.devnet.solana.com",
696            Pubkey::from_str("9JtczxrJjPM4J1xooxr2rFXmRivarb4BwjNiBgXDwe2p").unwrap(),
697            retry_policy::never(),
698        )
699        .await
700        .unwrap()
701        .take(5);
702
703        while let Some(event) = event_stream.next().await {
704            dbg!(event);
705        }
706    }
707
708    #[tokio::test]
709    async fn log_stream_handles_jit_proxy_events() {
710        let cache = TxSignatureCache::new(16);
711        let (event_tx, mut event_rx) = channel(16);
712
713        let mut log_stream = LogEventStream {
714            cache: Arc::new(cache.into()),
715            provider: Arc::new(
716                PubsubClient::new("wss://api.devnet.solana.com".into())
717                    .await
718                    .unwrap(),
719            ),
720            sub_account: "GgZkrSFgTAXZn1rNtZ533wpZi6nxx8whJC9bxRESB22c"
721                .try_into()
722                .unwrap(),
723            event_tx,
724            commitment: CommitmentConfig::confirmed(),
725        };
726
727        let logs: Vec<String> = [
728            "Program ComputeBudget111111111111111111111111111111 invoke [1]",
729            "Program ComputeBudget111111111111111111111111111111 success",
730            "Program J1TnP8zvVxbtF5KFp5xRmWuvG9McnhzmBd9XGfCyuxFP invoke [1]",
731            "Program log: Instruction: ArbPerp",
732            "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH invoke [2]",
733            "Program log: Instruction: PlaceAndTakePerpOrder",
734            "Program log: Invalid Spot 0 Oracle: Stale (oracle_delay=23)",
735            "Program log: 4DRDR8LtbQFOKvplAAAAAAAAGAABAAAAAAAAAAAAAAFGJn8TpIimFlKv8ZWRhmuU81x+ojkf3K4d+++MbslDfAGZcTYAAQEBAM5q/TIAAAABAAAAAAAAAAABAAAAAAAAAAAAAAAAAACTWxEAAAAAAAA=",
736            "Program log: aBNAOFkVAlpOKvplAAAAAEYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8qZQ2DwAAAABMTREAAAAAAADOav0yAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAJlxNgAYAAEBAQAAAQAAAQAAAAAA",
737            "Program log: 4DRDR8LtbQFOKvplAAAAAAIIGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAceaAwAAAAAAAQDOav0yAAAAAQQgzQ4AAAAAAQIjAQAAAAAAAQA+////////AAAAAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZlxNgABAQEAzmr9MgAAAAEAzmr9MgAAAAEEIM0OAAAAAAHpAf4sI0TDV0Ec0LWHs9mO40bjfKEm3A+yye5HFCQQQQEzPgAAAQABANraQssAAAABANraQssAAAABLJgAOwAAAACTWxEAAAAAAAA=",
738            "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH consumed 373815 of 1334075 compute units",
739            "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH success",
740            "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH invoke [2]",
741            "Program log: Instruction: PlaceAndTakePerpOrder",
742            "Program log: Invalid Spot 0 Oracle: Stale (oracle_delay=23)",
743            "Program log: 4DRDR8LtbQFOKvplAAAAAAAAGAABAAAAAAAAAAAAAAFGJn8TpIimFlKv8ZWRhmuU81x+ojkf3K4d+++MbslDfAGacTYAAQABAM5q/TIAAAABAAAAAAAAAAABAAAAAAAAAAAAAAAAAACTWxEAAAAAAAA=",
744            "Program log: aBNAOFkVAlpOKvplAAAAAEYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8qZQ2DwAAAACAPBEAAAAAAADOav0yAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAJpxNgAYAAEBAQABAAAAAQAAAAAA",
745            "Program log: 4DRDR8LtbQFOKvplAAAAAAIQGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAciaAwAAAAAAAQDgBS0LAAAAAQBYOwMAAAAAAYs/AAAAAAAAAAAB+Ejx//////8AAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZpxNgABAAEAzmr9MgAAAAEA4AUtCwAAAAEAWDsDAAAAAAAAAAAAAJNbEQAAAAAAAA==",
746            "Program log: 4DRDR8LtbQFOKvplAAAAAAIIGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAcmaAwAAAAAAAQDuZNAnAAAAAYBpgwsAAAAAAV3iAAAAAAAAARhp////////AAAAAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZpxNgABAAEAzmr9MgAAAAEAzmr9MgAAAAGAwb4OAAAAAAFmQRGN8PRJqt5D5pVvCspbc3f0ZBdTB1Kcw0YfuzxCOAH2/poHAQEBAIjmn+sAAAABAFrDjp4AAAABgPDZLQAAAACTWxEAAAAAAAA=",
747            "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH consumed 269624 of 934786 compute units",
748            "Program dRiftyHA39MWEi3m9aunc5MzRF1JYuBsbn6VPcn33UH success",
749            "Program log: pnl 792986",
750            "Program J1TnP8zvVxbtF5KFp5xRmWuvG9McnhzmBd9XGfCyuxFP consumed 738458 of 1399850 compute units",
751            "Program J1TnP8zvVxbtF5KFp5xRmWuvG9McnhzmBd9XGfCyuxFP success",
752            ].into_iter().map(Into::into).collect();
753
754        log_stream.process_log(RpcLogsResponse {
755            signature: "2jLk34wWwgecuws9iD9Ug63JdL8kYBePdtcakzG34zEx9KYVYD6HuokxMZYpFw799cJZBcaCMZ47WAxkGJjM7zNC".into(),
756            err: None,
757            logs: logs.clone(),
758        }).await;
759
760        // case 1: jit taker
761        assert_eq!(
762            event_rx.try_recv().expect("one event"),
763            DriftEvent::OrderFill {
764                maker: Some(
765                    "GgZkrSFgTAXZn1rNtZ533wpZi6nxx8whJC9bxRESB22c".try_into().unwrap(),
766                ),
767                maker_fee: -49664,
768                maker_order_id: 15923,
769                maker_side: Some(
770                    PositionDirection::Long,
771                ),
772                taker: Some(
773                    "5iqawn52cdBmsjC4hDegyFnX1iNRTNDV5mRsGzgqbuyD".try_into().unwrap(),
774                ),
775                taker_fee: 74498,
776                taker_order_id: 3568025,
777                taker_side: Some(
778                    PositionDirection::Short,
779                ),
780                base_asset_amount_filled: 219000000000,
781                quote_asset_amount_filled: 248324100,
782                market_index: 24,
783                market_type: MarketType::Perp,
784                oracle_price: 1137555,
785                signature: "2jLk34wWwgecuws9iD9Ug63JdL8kYBePdtcakzG34zEx9KYVYD6HuokxMZYpFw799cJZBcaCMZ47WAxkGJjM7zNC".into(),
786                tx_idx: 9,
787                ts: 1710893646,
788            }
789        );
790        assert!(event_rx.try_recv().is_err()); // no more events
791
792        // case 2: jit maker
793        // reset the cache and account to process the log from maker's side this time
794        log_stream.sub_account = "5iqawn52cdBmsjC4hDegyFnX1iNRTNDV5mRsGzgqbuyD"
795            .try_into()
796            .unwrap();
797        log_stream.cache.write().await.reset();
798
799        log_stream.process_log(RpcLogsResponse {
800            signature: "2jLk34wWwgecuws9iD9Ug63JdL8kYBePdtcakzG34zEx9KYVYD6HuokxMZYpFw799cJZBcaCMZ47WAxkGJjM7zNC".into(),
801            err: None,
802            logs: logs.clone(),
803        }).await;
804
805        assert!(event_rx.try_recv().is_ok()); // place/create
806        assert!(event_rx.try_recv().is_ok()); // fill with match
807        assert!(event_rx.try_recv().is_ok()); // place/create
808        assert!(event_rx.try_recv().is_ok()); // fill with amm
809        assert!(event_rx.try_recv().is_ok()); // fill with match
810        assert!(event_rx.try_recv().is_err()); // no more events
811    }
812
813    #[test]
814    fn test_log() {
815        let result = try_parse_log("Program log: 4DRDR8LtbQH+x7JlAAAAAAIIAAABAbpHl8YM/aWjrjfQ48x0R2DclPigyXtYx+5d/vSVjUIZAQoCAAAAAAAAAaJhIgAAAAAAAQDC6wsAAAAAAZjQCQEAAAAAAWsUAAAAAAAAAWTy////////AAAAAaNzGgMga9TnxjVkycO4bmqSGjK6kP92OrKdZMYqFV+aAS4eKQ4BAQEAHkHaNAAAAAEAwusLAAAAAAGY0AkBAAAAAAFneQwBwHPUIY9ykEdbxsTV7Lh6K+vISfq8nLCTm/rWoAHwCQAAAQABAMLrCwAAAAABAMLrCwAAAAABmNAJAQAAAAA9Zy8FAAAAAAA=", "sig", 0);
816        dbg!(result);
817    }
818
819    #[test]
820    fn parses_jit_proxy_logs() {
821        let cpi_logs = &[
822            "Program log: 4DRDR8LtbQFOKvplAAAAAAAAGAABAAAAAAAAAAAAAAFGJn8TpIimFlKv8ZWRhmuU81x+ojkf3K4d+++MbslDfAGZcTYAAQEBAM5q/TIAAAABAAAAAAAAAAABAAAAAAAAAAAAAAAAAACTWxEAAAAAAAAA",
823            "Program log: aBNAOFkVAlpOKvplAAAAAEYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8qZQ2DwAAAABMTREAAAAAAADOav0yAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAJlxNgAYAAEBAQAAAQAAAQAAAAAA",
824            "Program log: 4DRDR8LtbQFOKvplAAAAAAIIGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAceaAwAAAAAAAQDOav0yAAAAAQQgzQ4AAAAAAQIjAQAAAAAAAQA+////////AAAAAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZlxNgABAQEAzmr9MgAAAAEAzmr9MgAAAAEEIM0OAAAAAAHpAf4sI0TDV0Ec0LWHs9mO40bjfKEm3A+yye5HFCQQQQEzPgAAAQABANraQssAAAABANraQssAAAABLJgAOwAAAACTWxEAAAAAAAA=",
825            "Program log: 4DRDR8LtbQFOKvplAAAAAAAAGAABAAAAAAAAAAAAAAFGJn8TpIimFlKv8ZWRhmuU81x+ojkf3K4d+++MbslDfAGacTYAAQABAM5q/TIAAAABAAAAAAAAAAABAAAAAAAAAAAAAAAAAACTWxEAAAAAAAA=",
826            "Program log: aBNAOFkVAlpOKvplAAAAAEYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8qZQ2DwAAAACAPBEAAAAAAADOav0yAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAJpxNgAYAAEBAQABAAAAAQAAAAAA",
827            "Program log: 4DRDR8LtbQFOKvplAAAAAAIQGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAciaAwAAAAAAAQDgBS0LAAAAAQBYOwMAAAAAAYs/AAAAAAAAAAAB+Ejx//////8AAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZpxNgABAAEAzmr9MgAAAAEA4AUtCwAAAAEAWDsDAAAAAAAAAAAAAJNbEQAAAAAAAA==",
828            "Program log: 4DRDR8LtbQFOKvplAAAAAAIIGAABAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AQAAAAAAAAAAAcmaAwAAAAAAAQDuZNAnAAAAAYBpgwsAAAAAAV3iAAAAAAAAARhp////////AAAAAUYmfxOkiKYWUq/xlZGGa5TzXH6iOR/crh3774xuyUN8AZpxNgABAAEAzmr9MgAAAAEAzmr9MgAAAAGAwb4OAAAAAAFmQRGN8PRJqt5D5pVvCspbc3f0ZBdTB1Kcw0YfuzxCOAH2/poHAQEBAIjmn+sAAAABAFrDjp4AAAABgPDZLQAAAACTWxEAAAAAAAA=",
829        ];
830
831        for log in cpi_logs {
832            let result = try_parse_log(log, "sig", 0);
833            dbg!(log, result);
834        }
835    }
836
837    #[tokio::test]
838    async fn polled_event_stream_caching() {
839        let _ = env_logger::try_init();
840        struct MockRpcProvider {
841            tx_responses: HashMap<String, EncodedTransactionWithStatusMeta>,
842            signatures: tokio::sync::Mutex<Vec<String>>,
843        }
844
845        impl MockRpcProvider {
846            async fn add_signatures(&self, signatures: Vec<String>) {
847                let mut all_signatures = self.signatures.lock().await;
848                all_signatures.extend(signatures.into_iter());
849            }
850        }
851
852        impl EventRpcProvider for Arc<MockRpcProvider> {
853            fn get_tx(
854                &self,
855                signature: Signature,
856            ) -> BoxFuture<SdkResult<EncodedTransactionWithStatusMeta>> {
857                ready(
858                    self.tx_responses
859                        .get(signature.to_string().as_str())
860                        .ok_or(SdkError::Deserializing)
861                        .cloned(),
862                )
863                .boxed()
864            }
865            fn get_tx_signatures(
866                &self,
867                _account: Pubkey,
868                after: Option<Signature>,
869                _limit: Option<usize>,
870            ) -> BoxFuture<SdkResult<Vec<String>>> {
871                async move {
872                    let after = after.map(|s| s.to_string());
873                    let mut self_signatures = self.signatures.lock().await;
874                    if after.is_none() {
875                        return Ok(self_signatures.clone());
876                    }
877
878                    if let Some(idx) = self_signatures
879                        .iter()
880                        .position(|s| Some(s) == after.as_ref())
881                    {
882                        if idx > 0 {
883                            // newest -> oldest
884                            *self_signatures = self_signatures[..idx].to_vec();
885                        } else {
886                            self_signatures.clear();
887                        }
888                    }
889
890                    Ok(self_signatures.clone())
891                }
892                .boxed()
893            }
894        }
895
896        let (event_tx, mut event_rx) = channel(16);
897        let sub_account = Pubkey::new_unique();
898        let cache = Arc::new(RwLock::new(TxSignatureCache::new(16)));
899
900        let mut order_events: Vec<(OrderActionRecord, OrderRecord)> = (0..5)
901            .map(|id| {
902                (
903                    get_order_action_record(
904                        id as i64,
905                        OrderAction::Place,
906                        OrderActionExplanation::None,
907                        0,
908                        None,
909                        None,
910                        None,
911                        None,
912                        None,
913                        None,
914                        None,
915                        None,
916                        None,
917                        None,
918                        None,
919                        None,
920                        Some(sub_account.clone()),
921                        Some(Order {
922                            order_id: id,
923                            ..Default::default()
924                        }),
925                        0,
926                        0,
927                    ),
928                    OrderRecord {
929                        ts: id as i64,
930                        user: sub_account,
931                        order: Order {
932                            order_id: id,
933                            ..Default::default()
934                        },
935                    },
936                )
937            })
938            .collect();
939        let signatures: Vec<String> = (0..order_events.len())
940            .map(|_| Signature::new_unique().to_string())
941            .collect();
942        let mut tx_responses = HashMap::<String, EncodedTransactionWithStatusMeta>::default();
943        for s in signatures.iter() {
944            let (oar, or) = order_events.pop().unwrap();
945            tx_responses.insert(
946                s.clone(),
947                make_transaction(
948                    sub_account,
949                    Signature::from_str(s).unwrap(),
950                    Some(vec![
951                        format!("{PROGRAM_LOG}{}", serialize_event(oar)),
952                        format!("{PROGRAM_LOG}{}", serialize_event(or),),
953                    ]),
954                ),
955            );
956        }
957
958        let mock_rpc_provider = Arc::new(MockRpcProvider {
959            tx_responses,
960            signatures: Mutex::new(vec![signatures.first().unwrap().clone()]),
961        });
962
963        tokio::spawn(
964            PolledEventStream {
965                cache: Arc::clone(&cache),
966                provider: Arc::clone(&mock_rpc_provider),
967                sub_account,
968                event_tx,
969            }
970            .stream_fn(),
971        );
972        tokio::time::sleep(Duration::from_secs(1)).await;
973
974        // add 4 new tx signtaures
975        // 1) cached
976        // 2,3) emit events
977        // 4) cached
978        {
979            let mut cache_ = cache.write().await;
980            cache_.insert(signatures[1].clone());
981            cache_.insert(signatures[4].clone());
982        }
983        mock_rpc_provider
984            .add_signatures(signatures[1..].to_vec())
985            .await;
986        tokio::time::sleep(Duration::from_secs(1)).await;
987
988        assert!(event_rx.recv().await.is_some_and(|f| {
989            if let DriftEvent::OrderCreate { order, .. } = f {
990                println!("{}", order.order_id);
991                order.order_id == 1
992            } else {
993                false
994            }
995        }));
996        assert!(event_rx.recv().await.is_some_and(|f| {
997            if let DriftEvent::OrderCreate { order, .. } = f {
998                println!("{}", order.order_id);
999                order.order_id == 2
1000            } else {
1001                false
1002            }
1003        }));
1004        tokio::time::sleep(Duration::from_secs(1)).await;
1005        assert!(event_rx.try_recv().is_err());
1006    }
1007
1008    /// Make transaction with dummy instruction for drift program
1009    fn make_transaction(
1010        account: Pubkey,
1011        signature: Signature,
1012        logs: Option<Vec<String>>,
1013    ) -> EncodedTransactionWithStatusMeta {
1014        let mut meta = TransactionStatusMeta::default();
1015        meta.log_messages = logs;
1016        VersionedTransactionWithStatusMeta {
1017            transaction: VersionedTransaction {
1018                signatures: vec![signature],
1019                message: VersionedMessage::V0(
1020                    v0::Message::try_compile(
1021                        &account,
1022                        &[Instruction {
1023                            program_id: constants::PROGRAM_ID,
1024                            accounts: vec![AccountMeta::new_readonly(constants::PROGRAM_ID, true)],
1025                            data: Default::default(),
1026                        }],
1027                        &[],
1028                        Hash::new_unique(),
1029                    )
1030                    .expect("v0 message"),
1031                ),
1032            },
1033            meta,
1034        }
1035        .encode(UiTransactionEncoding::Base64, Some(0), false)
1036        .unwrap()
1037    }
1038
1039    /// serialize event to string like Drift program log
1040    pub fn serialize_event<T: AnchorSerialize + Discriminator>(event: T) -> String {
1041        let mut data_buf = T::DISCRIMINATOR.to_vec();
1042        event.serialize(&mut data_buf).expect("serializes");
1043        base64::engine::general_purpose::STANDARD.encode(data_buf)
1044    }
1045
1046    pub fn get_order_action_record(
1047        ts: i64,
1048        action: OrderAction,
1049        action_explanation: OrderActionExplanation,
1050        market_index: u16,
1051        filler: Option<Pubkey>,
1052        fill_record_id: Option<u64>,
1053        filler_reward: Option<u64>,
1054        base_asset_amount_filled: Option<u64>,
1055        quote_asset_amount_filled: Option<u64>,
1056        taker_fee: Option<u64>,
1057        maker_rebate: Option<u64>,
1058        referrer_reward: Option<u64>,
1059        quote_asset_amount_surplus: Option<i64>,
1060        spot_fulfillment_method_fee: Option<u64>,
1061        taker: Option<Pubkey>,
1062        taker_order: Option<Order>,
1063        maker: Option<Pubkey>,
1064        maker_order: Option<Order>,
1065        oracle_price: i64,
1066        bit_flags: u8,
1067    ) -> OrderActionRecord {
1068        OrderActionRecord {
1069            bit_flags,
1070            ts,
1071            action,
1072            action_explanation,
1073            market_index,
1074            market_type: if let Some(taker_order) = taker_order {
1075                taker_order.market_type
1076            } else if let Some(maker_order) = maker_order {
1077                maker_order.market_type
1078            } else {
1079                panic!("inalid order");
1080            },
1081            filler,
1082            filler_reward,
1083            fill_record_id,
1084            base_asset_amount_filled,
1085            quote_asset_amount_filled,
1086            taker_fee,
1087            maker_fee: match maker_rebate {
1088                Some(maker_rebate) => Some(maker_rebate as i64),
1089                None => None,
1090            },
1091            referrer_reward: match referrer_reward {
1092                Some(referrer_reward) if referrer_reward > 0 => {
1093                    Some(referrer_reward.try_into().unwrap())
1094                }
1095                _ => None,
1096            },
1097            quote_asset_amount_surplus,
1098            spot_fulfillment_method_fee,
1099            taker,
1100            taker_order_id: taker_order.map(|order| order.order_id),
1101            taker_order_direction: taker_order.map(|order| order.direction),
1102            taker_order_base_asset_amount: taker_order.map(|order| order.base_asset_amount),
1103            taker_order_cumulative_base_asset_amount_filled: taker_order
1104                .map(|order| order.base_asset_amount_filled),
1105            taker_order_cumulative_quote_asset_amount_filled: taker_order
1106                .as_ref()
1107                .map(|order| order.quote_asset_amount_filled),
1108            maker,
1109            maker_order_id: maker_order.map(|order| order.order_id),
1110            maker_order_direction: maker_order.map(|order| order.direction),
1111            maker_order_base_asset_amount: maker_order.map(|order| order.base_asset_amount),
1112            maker_order_cumulative_base_asset_amount_filled: maker_order
1113                .map(|order| order.base_asset_amount_filled),
1114            maker_order_cumulative_quote_asset_amount_filled: maker_order
1115                .map(|order| order.quote_asset_amount_filled),
1116            oracle_price,
1117        }
1118    }
1119}