drift_rs/
swift_order_subscriber.rs

1use std::{
2    cell::LazyCell,
3    time::{SystemTime, UNIX_EPOCH},
4};
5
6use anchor_lang::{
7    prelude::borsh::{self},
8    AnchorDeserialize, AnchorSerialize, InitSpace, Space,
9};
10use arrayvec::ArrayVec;
11use base64::Engine;
12use futures_util::{SinkExt, StreamExt};
13use serde::Deserialize;
14use serde_json::{json, Value};
15use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature};
16use tokio_stream::wrappers::ReceiverStream;
17use tokio_tungstenite::{connect_async, tungstenite::Message};
18
19pub use crate::types::{
20    SignedMsgOrderParamsDelegateMessage as SignedDelegateOrder,
21    SignedMsgOrderParamsMessage as SignedOrder,
22};
23use crate::{
24    constants::MarketExt,
25    types::{Context, MarketId, OrderParams, SdkError, SdkResult},
26    DriftClient, Wallet,
27};
28
29/// Swift message discriminator (Anchor)
30const SWIFT_MSG_PREFIX: LazyCell<[u8; 8]> = LazyCell::new(|| {
31    solana_sdk::hash::hash(b"global:SignedMsgOrderParamsMessage").to_bytes()[..8]
32        .try_into()
33        .unwrap()
34});
35
36/// Swift message discriminator (Anchor)
37const SWIFT_DELEGATE_MSG_PREFIX: LazyCell<[u8; 8]> = LazyCell::new(|| {
38    solana_sdk::hash::hash(b"global:SignedMsgOrderParamsDelegateMessage").to_bytes()[..8]
39        .try_into()
40        .unwrap()
41});
42
43pub const SWIFT_DEVNET_WS_URL: &str = "wss://master.swift.drift.trade";
44pub const SWIFT_MAINNET_WS_URL: &str = "wss://swift.drift.trade";
45
46const LOG_TARGET: &str = "swift";
47
48/// Common fields of signed message types
49pub struct SignedMessageInfo {
50    pub taker_pubkey: Pubkey,
51    pub order_params: OrderParams,
52    pub uuid: [u8; 8],
53    pub slot: Slot,
54}
55
56/// It can be either signed by the authority keypair or an authorized delegate
57#[derive(Clone, Debug, PartialEq, AnchorSerialize, AnchorDeserialize, InitSpace, Copy)]
58pub enum SignedOrderType {
59    /// Swift order signed by authority keypair
60    Authority(SignedOrder),
61    /// Swift order signed by a delegated keypair
62    Delegated(SignedDelegateOrder),
63}
64
65impl SignedOrderType {
66    /// Returns true if this is a delegated signed msg order
67    pub fn is_delegated(&self) -> bool {
68        matches!(self, Self::Delegated(_))
69    }
70    /// Serialize as a borsh buffer
71    ///
72    /// This differs from `AnchorSerialize` as it does _not_ encode the enum byte
73    ///
74    /// DEV: Swift clients do not encode or decode the enum byte
75    pub fn to_borsh(&self) -> ArrayVec<u8, { SignedOrderType::INIT_SPACE - 1 + 8 }> {
76        // SignedOrderType::INIT_SPACE (max variant size) -1 (no enum byte) +8 (anchor discriminator len)
77        let mut buf = ArrayVec::new();
78        match self {
79            Self::Authority(ref x) => {
80                (*SWIFT_MSG_PREFIX).serialize(&mut buf).unwrap();
81                x.serialize(&mut buf).unwrap();
82            }
83            Self::Delegated(ref x) => {
84                (*SWIFT_DELEGATE_MSG_PREFIX).serialize(&mut buf).unwrap();
85                x.serialize(&mut buf).unwrap();
86            }
87        }
88
89        buf
90    }
91
92    pub fn info(&self, taker_authority: &Pubkey) -> SignedMessageInfo {
93        match self {
94            Self::Authority(x) => SignedMessageInfo {
95                taker_pubkey: Wallet::derive_user_account(taker_authority, x.sub_account_id),
96                order_params: x.signed_msg_order_params,
97                uuid: x.uuid,
98                slot: x.slot,
99            },
100            Self::Delegated(x) => SignedMessageInfo {
101                taker_pubkey: x.taker_pubkey,
102                order_params: x.signed_msg_order_params,
103                uuid: x.uuid,
104                slot: x.slot,
105            },
106        }
107    }
108}
109
110/// Order notification from Websocket
111#[derive(Clone, Deserialize)]
112struct OrderNotification<'a> {
113    #[allow(dead_code)]
114    channel: &'a str,
115    order: SignedOrderInfo,
116}
117
118#[derive(Deserialize)]
119struct Heartbeat {
120    #[serde(deserialize_with = "deser_int_str", rename = "message")]
121    ts: u64,
122}
123
124/// Swift order and metadata fresh from the Websocket
125///
126/// This is an off-chain authorization for a taker order.
127/// It may be placed and filled by any willing counter-party, ensuring the time-price bounds
128/// are respected.
129#[derive(Clone, Debug, Deserialize)]
130pub struct SignedOrderInfo {
131    /// Swift order uuid
132    uuid: String,
133    /// Order creation timestamp (unix ms)
134    pub ts: u64,
135    /// The taker authority pubkey
136    #[serde(deserialize_with = "deser_pubkey")]
137    pub taker_authority: Pubkey,
138    /// The authority pubkey that verifies `signature`
139    /// it is either the taker authority or a sub-account delegate
140    #[serde(rename = "signing_authority", deserialize_with = "deser_pubkey")]
141    pub signer: Pubkey,
142    /// hex-ified, borsh encoded signed order message
143    /// this is the signed/verified payload for onchain use
144    #[serde(rename = "order_message", deserialize_with = "deser_signed_msg_type")]
145    order: SignedOrderType,
146    /// Signature over the serialized `order` payload
147    #[serde(rename = "order_signature", deserialize_with = "deser_signature")]
148    pub signature: Signature,
149}
150
151impl SignedOrderInfo {
152    /// The order's UUID (stringified)
153    pub fn order_uuid_str(&self) -> &str {
154        self.uuid.as_ref()
155    }
156    /// The order's UUID (raw)
157    pub fn order_uuid(&self) -> [u8; 8] {
158        match self.order {
159            SignedOrderType::Authority(inner) => inner.uuid,
160            SignedOrderType::Delegated(inner) => inner.uuid,
161        }
162    }
163    /// The drift order params of the message
164    pub fn order_params(&self) -> OrderParams {
165        match self.order {
166            SignedOrderType::Authority(inner) => inner.signed_msg_order_params,
167            SignedOrderType::Delegated(inner) => inner.signed_msg_order_params,
168        }
169    }
170    /// Get the taker sub-account for the order
171    ///
172    /// `taker_authority` - the Authority pubkey of the taker's sub-account
173    pub fn taker_subaccount(&self) -> Pubkey {
174        match self.order {
175            SignedOrderType::Authority(inner) => {
176                Wallet::derive_user_account(&self.taker_authority, inner.sub_account_id)
177            }
178            SignedOrderType::Delegated(inner) => inner.taker_pubkey,
179        }
180    }
181    /// serialize the order message for onchain use e.g. signature verification
182    pub fn encode_for_signing(&self) -> Vec<u8> {
183        hex::encode(self.order.to_borsh()).into_bytes()
184    }
185    /// convert swift order into anchor ix data
186    pub fn to_ix_data(&self) -> Vec<u8> {
187        let signed_msg = self.encode_for_signing();
188        [
189            self.signature.as_ref(),
190            self.signer.as_ref(),
191            &(signed_msg.len() as u16).to_le_bytes(),
192            signed_msg.as_ref(),
193        ]
194        .concat()
195    }
196
197    /// Returns true if the order was signed using delegated authority
198    pub fn using_delegate_signing(&self) -> bool {
199        self.order.is_delegated()
200    }
201
202    pub fn new(
203        uuid: String,
204        taker_authority: Pubkey,
205        signer: Pubkey,
206        order: SignedOrderType,
207        signature: Signature,
208    ) -> Self {
209        Self {
210            uuid,
211            ts: unix_now_ms(),
212            taker_authority,
213            signer,
214            order,
215            signature,
216        }
217    }
218}
219
220/// Emits swift orders from the Ws server
221pub type SwiftOrderStream = ReceiverStream<SignedOrderInfo>;
222
223/// Subscribe to the Swift WebSocket server, authenticate, and listen to new orders
224///
225/// `client` Drift client instance
226/// `markets` markets to listen on for new swift orders
227///
228/// Returns a stream of new Swift order messages
229pub async fn subscribe_swift_orders(
230    client: &DriftClient,
231    markets: &[MarketId],
232) -> SdkResult<SwiftOrderStream> {
233    let base_url = if client.context == Context::MainNet {
234        SWIFT_MAINNET_WS_URL
235    } else {
236        SWIFT_DEVNET_WS_URL
237    };
238    let maker_pubkey = client.wallet().authority().to_string();
239    let (ws_stream, _) = connect_async(format!("{base_url}/ws?pubkey={maker_pubkey}"))
240        .await
241        .map_err(|err| {
242            log::error!(target: LOG_TARGET, "couldn't connect to server: {err:?}");
243            SdkError::WsClient(err)
244        })?;
245
246    let (mut outgoing, mut incoming) = ws_stream.split();
247
248    // handle authentication and subscription
249    while let Some(msg) = incoming.next().await {
250        let msg = msg.map_err(|err| {
251            log::error!(target: LOG_TARGET, "failed reading swift msg: {err:?}");
252            SdkError::WsClient(err)
253        })?;
254
255        if let Message::Text(text) = msg {
256            log::debug!(target: LOG_TARGET, "msg: {text}");
257            let message: Value = serde_json::from_str(&text).expect("Failed to parse message");
258
259            if let Some(err) = message.get("error") {
260                log::error!(target: LOG_TARGET, "swift server error: {err:?}");
261                return Err(SdkError::WebsocketError);
262            }
263
264            // authenticate with Ws server
265            if message["channel"] == "auth" && message.get("nonce").is_some() {
266                let nonce = message["nonce"].as_str().expect("got nonce");
267                let signature = client
268                    .wallet()
269                    .sign_message(nonce.as_bytes())
270                    .expect("infallible");
271                let signature_b64 =
272                    base64::engine::general_purpose::STANDARD.encode(signature.as_ref());
273
274                let auth_message = json!({
275                    "pubkey": maker_pubkey,
276                    "signature": signature_b64,
277                })
278                .to_string();
279                outgoing.send(Message::Text(auth_message.into())).await?;
280                continue;
281            }
282
283            // subscribe to markets
284            if message["channel"] == "auth" && message["message"] == "Authenticated" {
285                let subscribe_msgs: Vec<Result<Message, _>> = markets
286                    .iter()
287                    .map(|m| {
288                        assert!(m.is_perp(), "only perp markets");
289                        let market = client
290                            .program_data()
291                            .perp_market_config_by_index(m.index())
292                            .expect("market exists");
293                        let subscribe_msg = json!({
294                          "action": "subscribe",
295                          "market_type": "perp",
296                          "market_name": market.symbol(),
297                        })
298                        .to_string();
299                        Ok(Message::Text(subscribe_msg.into()))
300                    })
301                    .collect();
302
303                outgoing
304                    .send_all(&mut futures_util::stream::iter(subscribe_msgs))
305                    .await?;
306                break;
307            }
308        }
309    }
310
311    let (tx, rx) = tokio::sync::mpsc::channel(256);
312
313    // handle swift orders
314    tokio::spawn(async move {
315        while let Some(msg) = incoming.next().await {
316            match msg {
317                Ok(Message::Text(ref text)) => {
318                    match serde_json::from_str::<OrderNotification>(text) {
319                        Ok(OrderNotification { channel: _, order }) => {
320                            log::debug!(target: LOG_TARGET, "uuid: {}, latency: {}ms", order.uuid, unix_now_ms().saturating_sub(order.ts));
321
322                            if let Err(err) = tx.try_send(order) {
323                                log::error!(target: LOG_TARGET, "order chan failed: {err:?}");
324                                break;
325                            }
326                        }
327                        Err(err) => {
328                            if text.contains("heartbeat") {
329                                if let Ok(heartbeat) = serde_json::from_str::<Heartbeat>(text) {
330                                    log::debug!(
331                                        target: LOG_TARGET,
332                                        "heartbeat latency: {}",
333                                        unix_now_ms().saturating_sub(heartbeat.ts)
334                                    );
335                                    continue;
336                                }
337                            }
338                            log::error!(target: LOG_TARGET, "{text}. invalid json: {err:?}");
339                            break;
340                        }
341                    }
342                }
343                Ok(Message::Close(_)) => {
344                    log::error!(target: LOG_TARGET, "server closed connection");
345                    break;
346                }
347                Ok(_) => continue,
348                Err(err) => {
349                    log::error!(target: LOG_TARGET, "failed reading swift msg: {err:?}");
350                    break;
351                }
352            }
353        }
354    });
355
356    Ok(ReceiverStream::new(rx))
357}
358
359fn unix_now_ms() -> u64 {
360    SystemTime::now()
361        .duration_since(UNIX_EPOCH)
362        .unwrap()
363        .as_millis() as u64
364}
365
366fn deser_pubkey<'de, D>(deserializer: D) -> Result<Pubkey, D::Error>
367where
368    D: serde::de::Deserializer<'de>,
369{
370    let s: &str = serde::de::Deserialize::deserialize(deserializer)?;
371    Ok(s.parse().expect("base58 pubkey"))
372}
373
374fn deser_signature<'de, D>(deserializer: D) -> Result<Signature, D::Error>
375where
376    D: serde::de::Deserializer<'de>,
377{
378    let s: &str = serde::de::Deserialize::deserialize(deserializer)?;
379    Ok(Signature::try_from(base64::engine::general_purpose::STANDARD.decode(s).unwrap()).unwrap())
380}
381
382fn deser_int_str<'de, D>(deserializer: D) -> Result<u64, D::Error>
383where
384    D: serde::de::Deserializer<'de>,
385{
386    let s: &str = serde::de::Deserialize::deserialize(deserializer)?;
387    Ok(s.parse().unwrap())
388}
389
390/// Deserialize hex-ified, borsh bytes as a `SignedOrderType`
391pub fn deser_signed_msg_type<'de, D>(deserializer: D) -> Result<SignedOrderType, D::Error>
392where
393    D: serde::Deserializer<'de>,
394{
395    let payload: &[u8] = serde::Deserialize::deserialize(deserializer)?;
396    if payload.len() % 2 != 0 {
397        return Err(serde::de::Error::custom("Hex string length must be even"));
398    }
399
400    // decode expecting the largest possible variant
401    let mut borsh_buf = [0u8; SignedDelegateOrder::INIT_SPACE + 8];
402
403    hex::decode_to_slice(payload, &mut borsh_buf[..payload.len() / 2])
404        .map_err(serde::de::Error::custom)?;
405
406    // this is basically the same as if we derived AnchorDeserialize on `SignedOrderType` _expect_ it does not
407    // add a u8 to distinguish the enum
408    if borsh_buf[..8] == *SWIFT_DELEGATE_MSG_PREFIX {
409        AnchorDeserialize::deserialize(&mut &borsh_buf[8..])
410            .map(SignedOrderType::Delegated)
411            .map_err(serde::de::Error::custom)
412    } else {
413        AnchorDeserialize::deserialize(&mut &borsh_buf[8..])
414            .map(SignedOrderType::Authority)
415            .map_err(serde::de::Error::custom)
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422    use crate::types::{
423        MarketType, OrderTriggerCondition, OrderType, PositionDirection, PostOnlyParam,
424    };
425
426    #[test]
427    fn test_swift_order_deser() {
428        let msg = r#"{
429            "channel":"signed_orders_perp_1",
430            "order":{
431                "market_index":1,
432                "market_type":"perp",
433                "order_message":"b9c165ffdf70594d0001010080841e00000000000000000000000000010000000000000000013201a4e99abc16000000011ab2f982160000000300900f84150000000072753959424c52740000",
434                "order_signature":"FIgxWlW+C0abvtE8esSko7At1YGM8h66T0u5lJpwXirW63CuvEllVWZ68NNVFsaqcj4jqgQInXUnLPjIf/PQDA==",
435                "signing_authority":"4rmhwytmKH1XsgGAUyUUH7U64HS5FtT6gM8HGKAfwcFE",
436                "taker_authority":"DxoRJ4f5XRMvXU9SGuM4ZziBFUxbhB3ubur5sVZEvue2",
437                "ts":1739518796400,
438                "uuid":"ru9YBLRt"
439            }
440        }"#;
441        let order_notification: OrderNotification = serde_json::from_str(&msg).unwrap();
442        let signed_message = order_notification.order;
443        assert_eq!(
444            signed_message.signer,
445            "4rmhwytmKH1XsgGAUyUUH7U64HS5FtT6gM8HGKAfwcFE"
446                .parse()
447                .unwrap()
448        );
449        assert_eq!(
450            signed_message.taker_authority,
451            "DxoRJ4f5XRMvXU9SGuM4ZziBFUxbhB3ubur5sVZEvue2"
452                .parse()
453                .unwrap()
454        );
455        assert_eq!(signed_message.ts, 1739518796400);
456        assert_eq!(signed_message.uuid, "ru9YBLRt");
457        assert_eq!(signed_message.order_params().market_index, 1);
458        assert_eq!(signed_message.order_params().market_type, MarketType::Perp);
459    }
460
461    #[test]
462    fn test_swift_order_encode_for_signing() {
463        let msg = "{\"channel\":\"swift_orders_perp_2\",\"order\":{\"market_index\":2,\"market_type\":\"perp\",\"order_message\":\"c8d5a65e2234f55d0001010080841e0000000000000000000000000002000000000000000001320124c6aa950000000001786b2f94000000000000bb64a9150000000074735730364f6d380000\",\"order_signature\":\"SaOaLJ1i0MqZ2cXdp00jGe2EJFa32eOfiQynFU7mclhT86yhIa4/tWXq7r6l7QPN0Jl6frfsZl0nNOvKZxZpAA==\",\"signing_authority\":\"4rmhwytmKH1XsgGAUyUUH7U64HS5FtT6gM8HGKAfwcFE\",\"taker_authority\":\"4rmhwytmKH1XsgGAUyUUH7U64HS5FtT6gM8HGKAfwcFE\",\"ts\":1740456840770,\"uuid\":\"tsW06Om8\"}}";
464        let order_notification: OrderNotification = serde_json::from_str(&msg).unwrap();
465        let signed_message = order_notification.order;
466        assert_eq!(
467            signed_message.encode_for_signing().as_slice(),
468            b"c8d5a65e2234f55d0001010080841e0000000000000000000000000002000000000000000001320124c6aa950000000001786b2f94000000000000bb64a9150000000074735730364f6d380000"
469        );
470    }
471
472    #[test]
473    fn deserialize_incoming_signed_message_delegated() {
474        let payload = serde_json::json!({
475            "channel": "swift_orders_perp_2",
476            "order": {
477                "market_index": 2,
478                "market_type": "perp",
479                "order_message": "42656638c7259e230001010080841e00000000000000000000000000020000000000000000013201bb60507d000000000117c0127c00000000395311d51c1b87fd56c3b5872d1041111e51f399b12d291d981a0ea383407295272108160000000073386c754a4c5a650000",
480                "order_signature": "9G8luwFfeAc25HwXCgaUjrKv6yJHcMFDq4Z4uPXqom5mhwZ63YU5g7p07Kxe/AKSt5A/9OPDh3nN/c9IHjkCDA==",
481                "taker_authority": "4rmhwytmKH1XsgGAUyUUH7U64HS5FtT6gM8HGKAfwcFE",
482                "signing_authority": "GiMXQkJXLVjScmQDkoLJShBJpTh9SDPvT2AZQq8NyEBf",
483                "ts": 1739518796400_u64,
484                "uuid":"s8luJLZe"
485            }
486        })
487        .to_string();
488        let actual: OrderNotification<'_> =
489            serde_json::from_str(payload.as_str()).expect("deserializes");
490
491        assert_eq!(
492            actual.order.signer,
493            solana_sdk::pubkey!("GiMXQkJXLVjScmQDkoLJShBJpTh9SDPvT2AZQq8NyEBf")
494        );
495        assert_eq!(
496            actual.order.taker_authority,
497            solana_sdk::pubkey!("4rmhwytmKH1XsgGAUyUUH7U64HS5FtT6gM8HGKAfwcFE")
498        );
499        assert_eq!(actual.order.order_uuid_str(), "s8luJLZe");
500
501        if let SignedOrderType::Delegated(signed_msg) = actual.order.order {
502            let expected = SignedDelegateOrder {
503                signed_msg_order_params: OrderParams {
504                    order_type: OrderType::Market,
505                    market_type: MarketType::Perp,
506                    direction: PositionDirection::Short,
507                    user_order_id: 0,
508                    base_asset_amount: 2000000,
509                    price: 0,
510                    market_index: 2,
511                    reduce_only: false,
512                    post_only: PostOnlyParam::None,
513                    immediate_or_cancel: false,
514                    max_ts: None,
515                    trigger_price: None,
516                    trigger_condition: OrderTriggerCondition::Above,
517                    oracle_price_offset: None,
518                    auction_duration: Some(50),
519                    auction_start_price: Some(2102419643),
520                    auction_end_price: Some(2081603607),
521                },
522                taker_pubkey: solana_sdk::pubkey!("4rmhwytmKH1XsgGAUyUUH7U64HS5FtT6gM8HGKAfwcFE"),
523                slot: 369631527,
524                uuid: [115, 56, 108, 117, 74, 76, 90, 101],
525                take_profit_order_params: None,
526                stop_loss_order_params: None,
527            };
528            assert_eq!(signed_msg, expected);
529        } else {
530            assert!(false, "unexpected variant");
531        }
532    }
533}