Skip to main content

boundless_market/
order_stream_client.rs

1// Copyright 2026 Boundless Foundation, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt::Display, pin::Pin};
16
17use alloy::{
18    primitives::{Address, Signature, U256},
19    signers::{Error as SignerErr, Signer},
20};
21use alloy_primitives::B256;
22use anyhow::{Context, Result};
23use async_stream::stream;
24use chrono::{DateTime, Utc};
25use futures_util::{SinkExt, Stream, StreamExt};
26use reqwest::Url;
27use serde::{Deserialize, Serialize};
28use siwe::Message as SiweMsg;
29use thiserror::Error;
30use time::OffsetDateTime;
31use tokio::net::TcpStream;
32use tokio_tungstenite::{
33    connect_async, tungstenite, tungstenite::client::IntoClientRequest, MaybeTlsStream,
34    WebSocketStream,
35};
36use utoipa::ToSchema;
37
38use crate::contracts::{ProofRequest, RequestError};
39
40/// Order stream submission API path.
41pub const ORDER_SUBMISSION_PATH: &str = "/api/v1/submit_order";
42/// Order stream order list API path.
43pub const ORDER_LIST_PATH: &str = "/api/v1/orders";
44/// Order stream order list API path (v2 with cursor pagination).
45pub const ORDER_LIST_PATH_V2: &str = "/api/v2/orders";
46/// Order stream nonce API path.
47pub const AUTH_GET_NONCE: &str = "/api/v1/nonce/";
48/// Order stream health check API path.
49pub const HEALTH_CHECK: &str = "/api/v1/health";
50/// Order stream websocket path.
51pub const ORDER_WS_PATH: &str = "/ws/v1/orders";
52
53/// Error body for API responses
54#[derive(Debug, Deserialize, Serialize, ToSchema)]
55pub struct ErrMsg {
56    /// Error type enum
57    pub r#type: String,
58    /// Error message body
59    pub msg: String,
60}
61impl ErrMsg {
62    /// Create a new error message.
63    pub fn new(r#type: &str, msg: &str) -> Self {
64        Self { r#type: r#type.into(), msg: msg.into() }
65    }
66}
67impl std::fmt::Display for ErrMsg {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        write!(f, "error_type: {} msg: {}", self.r#type, self.msg)
70    }
71}
72
73/// Error type for the Order
74#[derive(Error, Debug)]
75#[non_exhaustive]
76pub enum OrderError {
77    #[error("invalid signature: {0}")]
78    /// Invalid signature error.
79    InvalidSignature(SignerErr),
80    #[error("request error: {0}")]
81    /// Request error.
82    RequestError(#[from] RequestError),
83}
84
85/// Order struct, containing a ProofRequest and its Signature
86///
87/// The contents of this struct match the calldata of the `submitOrder` function in the `BoundlessMarket` contract.
88#[derive(Serialize, Deserialize, ToSchema, Debug, Clone, PartialEq)]
89pub struct Order {
90    /// Order request
91    #[schema(value_type = Object)]
92    pub request: ProofRequest,
93    /// Request digest
94    #[schema(value_type = Object)]
95    pub request_digest: B256,
96    /// Order signature
97    // TODO: This should not be Signature. It should be Bytes or Vec<u8>.
98    #[schema(value_type = Object)]
99    pub signature: Signature,
100}
101
102/// Order data + order-stream id
103#[derive(Serialize, Deserialize, ToSchema, Debug, Clone)]
104pub struct OrderData {
105    /// Order stream id
106    pub id: i64,
107    /// Order data
108    pub order: Order,
109    /// Time the order was submitted
110    #[schema(value_type = String)]
111    pub created_at: DateTime<Utc>,
112}
113
114/// Nonce object for authentication to order-stream websocket
115#[derive(Serialize, Deserialize, ToSchema, Debug, Clone)]
116pub struct Nonce {
117    /// Nonce hex encoded
118    pub nonce: String,
119}
120
121/// Response for submitting a new order
122#[derive(Serialize, Deserialize, ToSchema, Debug, Clone)]
123pub struct SubmitOrderRes {
124    /// Status of the order submission
125    pub status: String,
126    /// Request ID submitted
127    #[schema(value_type = Object)]
128    pub request_id: U256,
129}
130
131/// Sort direction for listing orders
132#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(rename_all = "lowercase")]
134pub enum SortDirection {
135    /// Ascending order (oldest first)
136    Asc,
137    /// Descending order (newest first)
138    Desc,
139}
140
141/// Response for v2 list orders API with pagination info
142#[derive(Serialize, Deserialize, ToSchema, Debug, Clone)]
143pub struct ListOrdersV2Response {
144    /// List of orders
145    pub orders: Vec<OrderData>,
146    /// Cursor for the next page (if available)
147    pub next_cursor: Option<String>,
148    /// Whether there are more results available
149    pub has_more: bool,
150}
151
152impl Order {
153    /// Create a new Order
154    pub fn new(request: ProofRequest, request_digest: B256, signature: Signature) -> Self {
155        Self { request, request_digest, signature }
156    }
157
158    /// Validate the Order
159    pub fn validate(&self, market_address: Address, chain_id: u64) -> Result<(), OrderError> {
160        self.request.validate()?;
161        let hash = self.request.signing_hash(market_address, chain_id)?;
162        if hash != self.request_digest {
163            return Err(OrderError::RequestError(RequestError::DigestMismatch));
164        }
165        self.request.verify_signature(
166            &self.signature.as_bytes().into(),
167            market_address,
168            chain_id,
169        )?;
170        Ok(())
171    }
172}
173
174/// Authentication message for connecting to order-stream websock
175#[derive(Deserialize, Serialize, ToSchema, Debug, Clone)]
176pub struct AuthMsg {
177    /// SIWE message body
178    #[schema(value_type = Object)]
179    message: SiweMsg,
180    /// SIWE Signature of `message` field
181    #[schema(value_type = Object)]
182    signature: Signature,
183}
184
185/// VersionInfo struct for SIWE message
186#[derive(Deserialize, Serialize, ToSchema, Debug, Clone)]
187pub struct VersionInfo {
188    /// Version of the Boundless client
189    pub version: String,
190    /// Git hash of the Boundless client
191    pub git_hash: String,
192}
193
194impl Display for VersionInfo {
195    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        write!(f, "Boundless Order Stream\nVersion={}\nGit hash={}", self.version, self.git_hash)
197    }
198}
199
200impl From<&SiweMsg> for VersionInfo {
201    fn from(msg: &SiweMsg) -> Self {
202        let mut version = "unknown".to_string();
203        let mut git_hash = "unknown".to_string();
204        if let Some(statement) = &msg.statement {
205            let parts: Vec<&str> = statement.split(':').collect();
206            if parts.len() == 3 && parts[0] == "Boundless Order Stream" {
207                version = parts[1].to_string();
208                git_hash = parts[2].to_string();
209            }
210        }
211        Self { version, git_hash }
212    }
213}
214
215impl From<&AuthMsg> for VersionInfo {
216    fn from(auth_msg: &AuthMsg) -> Self {
217        VersionInfo::from(&auth_msg.message)
218    }
219}
220
221impl AuthMsg {
222    /// Creates a new authentication message from a nonce, origin, signer
223    pub async fn new(nonce: Nonce, origin: &Url, signer: &impl Signer) -> Result<Self> {
224        let version = env!("CARGO_PKG_VERSION");
225        let git_hash = option_env!("BOUNDLESS_GIT_HASH").unwrap_or("unknown");
226        let message = format!(
227            "{} wants you to sign in with your Ethereum account:\n{}\n\nBoundless Order Stream:{version}:{git_hash}\n\nURI: {}\nVersion: 1\nChain ID: 1\nNonce: {}\nIssued At: {}",
228            origin.authority(), signer.address(), origin, nonce.nonce, Utc::now().to_rfc3339(),
229        );
230        let message: SiweMsg = message.parse()?;
231
232        let signature = signer
233            .sign_hash(&message.eip191_hash().context("Failed to generate eip191 hash")?.into())
234            .await?;
235
236        Ok(Self { message, signature })
237    }
238
239    /// Verify a [AuthMsg] message + signature
240    pub async fn verify(&self, domain: &str, nonce: &str) -> Result<()> {
241        let opts = siwe::VerificationOpts {
242            domain: Some(domain.parse().context("Invalid domain")?),
243            nonce: Some(nonce.into()),
244            timestamp: Some(OffsetDateTime::now_utc()),
245        };
246
247        self.message
248            .verify(&self.signature.as_bytes(), &opts)
249            .await
250            .context("Failed to verify SIWE message")
251    }
252
253    /// [AuthMsg] address in alloy format
254    pub fn address(&self) -> Address {
255        Address::from(self.message.address)
256    }
257
258    /// Return the version info from the message
259    pub fn version_info(&self) -> VersionInfo {
260        VersionInfo::from(self)
261    }
262}
263
264/// Client for interacting with the order stream server
265#[derive(Clone, Debug)]
266pub struct OrderStreamClient {
267    /// HTTP client
268    pub client: reqwest::Client,
269    /// Base URL of the order stream server
270    pub base_url: Url,
271    /// Address of the market contract
272    pub boundless_market_address: Address,
273    /// Chain ID of the network
274    pub chain_id: u64,
275    /// Optional API key for authenticated requests
276    pub api_key: Option<String>,
277}
278
279impl OrderStreamClient {
280    /// Create a new client
281    pub fn new(base_url: Url, boundless_market_address: Address, chain_id: u64) -> Self {
282        Self {
283            client: reqwest::Client::new(),
284            base_url,
285            boundless_market_address,
286            chain_id,
287            api_key: None,
288        }
289    }
290
291    /// Create a new client with API key
292    pub fn new_with_api_key(
293        base_url: Url,
294        boundless_market_address: Address,
295        chain_id: u64,
296        api_key: String,
297    ) -> Self {
298        Self {
299            client: reqwest::Client::new(),
300            base_url,
301            boundless_market_address,
302            chain_id,
303            api_key: Some(api_key),
304        }
305    }
306
307    /// Helper to add API key header to request if configured
308    fn add_api_key_header(&self, request: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
309        if let Some(api_key) = &self.api_key {
310            request.header("x-api-key", api_key)
311        } else {
312            request
313        }
314    }
315
316    /// Submit a proof request to the order stream server
317    pub async fn submit_request(
318        &self,
319        request: &ProofRequest,
320        signer: &impl Signer,
321    ) -> Result<Order> {
322        let url = self.base_url.join(ORDER_SUBMISSION_PATH)?;
323        let signature =
324            request.sign_request(signer, self.boundless_market_address, self.chain_id).await?;
325        let request_digest = request.signing_hash(self.boundless_market_address, self.chain_id)?;
326        let order = Order { request: request.clone(), request_digest, signature };
327        order.validate(self.boundless_market_address, self.chain_id)?;
328        let order_json = serde_json::to_value(&order)?;
329        let response = self
330            .add_api_key_header(
331                self.client.post(url).header("Content-Type", "application/json").json(&order_json),
332            )
333            .send()
334            .await?;
335
336        // Check for any errors in the response
337        if let Err(err) = response.error_for_status_ref() {
338            let error_message = match response.json::<serde_json::Value>().await {
339                Ok(json_body) => {
340                    json_body["msg"].as_str().unwrap_or("Unknown server error").to_string()
341                }
342                Err(_) => "Failed to read server error message".to_string(),
343            };
344
345            return Err(anyhow::Error::new(err).context(error_message));
346        }
347
348        Ok(order)
349    }
350
351    /// Fetch an order from the order stream server.
352    ///
353    /// If multiple orders are found, the `request_digest` must be provided to select the correct order.
354    pub async fn fetch_order(&self, id: U256, request_digest: Option<B256>) -> Result<Order> {
355        let url = self.base_url.join(&format!("{ORDER_LIST_PATH}/{id}"))?;
356        let response = self.add_api_key_header(self.client.get(url)).send().await?;
357
358        if !response.status().is_success() {
359            let error_message = match response.json::<serde_json::Value>().await {
360                Ok(json_body) => {
361                    json_body["msg"].as_str().unwrap_or("Unknown server error").to_string()
362                }
363                Err(_) => "Failed to read server error message".to_string(),
364            };
365
366            return Err(anyhow::Error::msg(error_message));
367        }
368
369        let order_data: Vec<OrderData> = response.json().await?;
370        let orders: Vec<Order> = order_data.into_iter().map(|data| data.order).collect();
371        if orders.is_empty() {
372            return Err(anyhow::Error::msg("No order found"));
373        } else if orders.len() == 1 {
374            return Ok(orders[0].clone());
375        }
376        match request_digest {
377            Some(digest) => {
378                for order in orders {
379                    if order.request_digest == digest {
380                        return Ok(order);
381                    }
382                }
383                Err(anyhow::Error::msg("No order found"))
384            }
385            None => {
386                Err(anyhow::Error::msg("Multiple orders found, please provide a request digest"))
387            }
388        }
389    }
390
391    /// Fetch an order with its creation timestamp from the order stream server.
392    ///
393    /// Returns both the Order and the timestamp when it was created.
394    /// If multiple orders are found, the `request_digest` must be provided to select the correct order.
395    pub async fn fetch_order_with_timestamp(
396        &self,
397        id: U256,
398        request_digest: Option<B256>,
399    ) -> Result<(Order, DateTime<Utc>)> {
400        let url = self.base_url.join(&format!("{ORDER_LIST_PATH}/{id}"))?;
401        let response = self.add_api_key_header(self.client.get(url)).send().await?;
402
403        if !response.status().is_success() {
404            let error_message = match response.json::<serde_json::Value>().await {
405                Ok(json_body) => {
406                    json_body["msg"].as_str().unwrap_or("Unknown server error").to_string()
407                }
408                Err(_) => "Failed to read server error message".to_string(),
409            };
410
411            return Err(anyhow::Error::msg(error_message));
412        }
413
414        let order_data: Vec<OrderData> = response.json().await?;
415        if order_data.is_empty() {
416            return Err(anyhow::Error::msg("No order found"));
417        } else if order_data.len() == 1 {
418            let data = &order_data[0];
419            return Ok((data.order.clone(), data.created_at));
420        }
421        match request_digest {
422            Some(digest) => {
423                for data in order_data {
424                    if data.order.request_digest == digest {
425                        return Ok((data.order, data.created_at));
426                    }
427                }
428                Err(anyhow::Error::msg("No order found"))
429            }
430            None => {
431                Err(anyhow::Error::msg("Multiple orders found, please provide a request digest"))
432            }
433        }
434    }
435
436    /// Get the nonce from the order stream service for websocket auth
437    pub async fn get_nonce(&self, address: Address) -> Result<Nonce> {
438        let url = self.base_url.join(AUTH_GET_NONCE)?.join(&address.to_string())?;
439        let res = self.add_api_key_header(self.client.get(url)).send().await?;
440        if !res.status().is_success() {
441            anyhow::bail!("Http error {} fetching nonce", res.status())
442        }
443        let nonce = res.json().await?;
444
445        Ok(nonce)
446    }
447
448    /// List orders sorted by creation time descending (most recent first)
449    ///
450    /// Returns orders created after the given timestamp (if provided), up to the specified limit.
451    /// If `after` is None, returns the most recent orders.
452    pub async fn list_orders_by_creation(
453        &self,
454        after: Option<DateTime<Utc>>,
455        limit: u64,
456    ) -> Result<Vec<OrderData>> {
457        let mut url = self.base_url.join(ORDER_LIST_PATH)?;
458
459        {
460            let mut query = url.query_pairs_mut();
461            query.append_pair("sort", "desc");
462            query.append_pair("limit", &limit.to_string());
463            if let Some(ts) = after {
464                query.append_pair("after", &ts.to_rfc3339());
465            }
466        }
467
468        let response = self.add_api_key_header(self.client.get(url)).send().await?;
469
470        if !response.status().is_success() {
471            let error_message = match response.json::<serde_json::Value>().await {
472                Ok(json_body) => {
473                    json_body["msg"].as_str().unwrap_or("Unknown server error").to_string()
474                }
475                Err(_) => "Failed to read server error message".to_string(),
476            };
477
478            return Err(anyhow::Error::msg(error_message));
479        }
480
481        let orders: Vec<OrderData> = response.json().await?;
482        Ok(orders)
483    }
484
485    /// List orders with cursor-based pagination and flexible filtering (v2)
486    ///
487    /// Provides cursor-based pagination for stable results, bidirectional sorting,
488    /// and timestamp range filtering. This is the recommended method for paginating
489    /// through orders as it handles concurrent inserts correctly.
490    ///
491    /// # Arguments
492    /// * `cursor` - Opaque cursor string from previous response for pagination
493    /// * `limit` - Maximum number of orders to return
494    /// * `sort` - Sort direction (Asc for oldest first, Desc for newest first)
495    /// * `before` - Optional timestamp to filter orders created before this time (EXCLUSIVE: `created_at < before`)
496    /// * `after` - Optional timestamp to filter orders created after this time (EXCLUSIVE: `created_at > after`)
497    ///
498    /// # Boundary Semantics
499    /// Both `before` and `after` parameters use exclusive comparison operators:
500    /// - `before`: Returns orders where `created_at < before` (does NOT include orders at the exact timestamp)
501    /// - `after`: Returns orders where `created_at > after` (does NOT include orders at the exact timestamp)
502    ///
503    /// To include orders at a specific timestamp boundary, add/subtract a small time delta (e.g., 1 second).
504    pub async fn list_orders_v2(
505        &self,
506        cursor: Option<String>,
507        limit: Option<u64>,
508        sort: Option<SortDirection>,
509        before: Option<DateTime<Utc>>,
510        after: Option<DateTime<Utc>>,
511    ) -> Result<ListOrdersV2Response> {
512        let mut url = self.base_url.join(ORDER_LIST_PATH_V2)?;
513
514        {
515            let mut query = url.query_pairs_mut();
516            if let Some(cursor_str) = cursor {
517                query.append_pair("cursor", &cursor_str);
518            }
519            if let Some(limit_val) = limit {
520                query.append_pair("limit", &limit_val.to_string());
521            }
522            if let Some(sort_val) = sort {
523                let sort_str = match sort_val {
524                    SortDirection::Asc => "asc",
525                    SortDirection::Desc => "desc",
526                };
527                query.append_pair("sort", sort_str);
528            }
529            if let Some(ts) = before {
530                query.append_pair("before", &ts.to_rfc3339());
531            }
532            if let Some(ts) = after {
533                query.append_pair("after", &ts.to_rfc3339());
534            }
535        }
536
537        let response = self.add_api_key_header(self.client.get(url)).send().await?;
538
539        if !response.status().is_success() {
540            let error_message = match response.json::<serde_json::Value>().await {
541                Ok(json_body) => {
542                    json_body["msg"].as_str().unwrap_or("Unknown server error").to_string()
543                }
544                Err(_) => "Failed to read server error message".to_string(),
545            };
546
547            return Err(anyhow::Error::msg(error_message));
548        }
549
550        let response_data: ListOrdersV2Response = response.json().await?;
551        Ok(response_data)
552    }
553
554    /// Return a WebSocket stream connected to the order stream server
555    ///
556    /// An authentication message is sent to the server via the `X-Auth-Data` header.
557    /// The authentication message must contain a valid claim of an address holding a (pre-configured)
558    /// minimum balance on the boundless market in order to connect to the server.
559    /// Only one connection per address is allowed.
560    pub async fn connect_async(
561        &self,
562        signer: &impl Signer,
563    ) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
564        let nonce = self
565            .get_nonce(signer.address())
566            .await
567            .context("Failed to fetch nonce from order-stream")?;
568
569        let auth_msg = AuthMsg::new(nonce, &self.base_url, signer).await?;
570
571        // Serialize the `AuthMsg` to JSON
572        let auth_json =
573            serde_json::to_string(&auth_msg).context("failed to serialize auth message")?;
574
575        // Construct the WebSocket URL
576        let host = self.base_url.host().context("missing host")?.to_string();
577        // Select TLS vs not
578        let ws_scheme = if self.base_url.scheme() == "https" { "wss" } else { "ws" };
579
580        let ws_url = match self.base_url.port() {
581            Some(port) => format!("{ws_scheme}://{host}:{port}{ORDER_WS_PATH}"),
582            None => format!("{ws_scheme}://{host}{ORDER_WS_PATH}"),
583        };
584
585        // Create the WebSocket request
586        let mut request =
587            ws_url.clone().into_client_request().context("failed to create request")?;
588        request
589            .headers_mut()
590            .insert("X-Auth-Data", auth_json.parse().context("failed to parse auth message")?);
591
592        // Add API key header if configured
593        if let Some(api_key) = &self.api_key {
594            request
595                .headers_mut()
596                .insert("x-api-key", api_key.parse().context("failed to parse api key")?);
597        }
598
599        // Connect to the WebSocket server and return the socket
600        let (socket, _) = match connect_async(request).await {
601            Ok(res) => res,
602            Err(tokio_tungstenite::tungstenite::Error::Http(err)) => {
603                let http_err = if let Some(http_body) = err.body() {
604                    String::from_utf8_lossy(http_body)
605                } else {
606                    "Empty http error body".into()
607                };
608                anyhow::bail!(
609                    "Failed to connect to ws endpoint ({}): {} {}",
610                    ws_url,
611                    self.base_url,
612                    http_err
613                );
614            }
615            Err(err) => {
616                anyhow::bail!(
617                    "Failed to connect to ws endpoint ({}): {} {}",
618                    ws_url,
619                    self.base_url,
620                    err
621                );
622            }
623        };
624        Ok(socket)
625    }
626}
627
628/// Stream of Order messages from a WebSocket
629///
630/// This function takes a WebSocket stream and returns a stream of `Order` messages.
631/// Example usage:
632/// ```no_run
633/// use alloy::signers::Signer;
634/// use boundless_market::order_stream_client::{OrderStreamClient, order_stream, OrderData};
635/// use futures_util::StreamExt;
636/// async fn example_stream(client: OrderStreamClient, signer: &impl Signer) {
637///     let socket = client.connect_async(signer).await.unwrap();
638///     let mut order_stream = order_stream(socket);
639///     while let Some(order) = order_stream.next().await {
640///         println!("Received order: {:?}", order)
641///     }
642/// }
643/// ```
644#[allow(clippy::type_complexity)]
645pub fn order_stream(
646    mut socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
647) -> Pin<Box<dyn Stream<Item = OrderData> + Send>> {
648    Box::pin(stream! {
649        // Create a ping interval - configurable via environment variable
650        let ping_duration = match std::env::var("ORDER_STREAM_CLIENT_PING_MS") {
651            Ok(ms) => match ms.parse::<u64>() {
652                Ok(ms) => {
653                    tracing::debug!("Using custom ping interval of {}ms", ms);
654                    tokio::time::Duration::from_millis(ms)
655                },
656                Err(_) => {
657                    tracing::warn!("Invalid ORDER_STREAM_CLIENT_PING_MS value: {}, using default", ms);
658                    tokio::time::Duration::from_secs(30)
659                }
660            },
661            Err(_) => tokio::time::Duration::from_secs(30),
662        };
663
664        let mut ping_interval = tokio::time::interval(ping_duration);
665        // Track the last ping we sent
666        let mut ping_data: Option<Vec<u8>> = None;
667
668        loop {
669            tokio::select! {
670                // Handle incoming messages
671                msg_result = socket.next() => {
672                    match msg_result {
673                        Some(Ok(tungstenite::Message::Text(msg))) => {
674                            match serde_json::from_str::<OrderData>(&msg) {
675                                Ok(order) => yield order,
676                                Err(err) => {
677                                    tracing::warn!("Failed to parse order: {:?}", err);
678                                    continue;
679                                }
680                            }
681                        }
682                        // Reply to Ping's inline
683                        Some(Ok(tungstenite::Message::Ping(data))) => {
684                            tracing::trace!("Responding to ping");
685                            if let Err(err) = socket.send(tungstenite::Message::Pong(data)).await {
686                                tracing::warn!("Failed to send pong: {:?}", err);
687                                break;
688                            }
689                        }
690                        // Handle Pong responses
691                        Some(Ok(tungstenite::Message::Pong(data))) => {
692                            tracing::trace!("Received pong from server");
693                            if let Some(expected_data) = ping_data.take() {
694                                if data != expected_data {
695                                    tracing::warn!("Server responded with invalid pong data");
696                                    break;
697                                }
698                            } else {
699                                tracing::warn!("Received unexpected pong from order-stream server");
700                            }
701                        }
702                        Some(Ok(tungstenite::Message::Close(msg))) => {
703                            tracing::warn!("Server closed the order stream connection with reason: {:?}", msg.map(|m| m.to_string()));
704                            break;
705                        }
706                        Some(Ok(other)) => {
707                            tracing::debug!("Ignoring non-text message: {:?}", other);
708                            continue;
709                        }
710                        Some(Err(err)) => {
711                            tracing::warn!("order stream socket error: {:?}", err);
712                            break;
713                        }
714                        None => {
715                            tracing::warn!("order stream socket closed unexpectedly");
716                            break;
717                        }
718                    }
719                }
720                // Send periodic pings
721                _ = ping_interval.tick() => {
722                    // If we still have a pending ping that hasn't been responded to
723                    if ping_data.is_some() {
724                        tracing::warn!("Server did not respond to ping, closing connection");
725                        break;
726                    }
727
728                    tracing::trace!("Sending ping to server");
729                    let random_bytes: Vec<u8> = (0..16).map(|_| rand::random::<u8>()).collect();
730                    if let Err(err) = socket.send(tungstenite::Message::Ping(random_bytes.clone())).await {
731                        tracing::warn!("Failed to send ping: {:?}", err);
732                        break;
733                    }
734                    ping_data = Some(random_bytes);
735                }
736            }
737        }
738    })
739}
740
741#[cfg(test)]
742mod tests {
743    use super::*;
744    use alloy::signers::local::LocalSigner;
745    use std::borrow::Cow;
746    use std::time::Duration;
747    use tokio::net::TcpListener;
748    use tokio_tungstenite::accept_async;
749    use tungstenite::protocol::{frame::coding::CloseCode, CloseFrame};
750
751    #[tokio::test]
752    async fn auth_msg_verify() {
753        let version = env!("CARGO_PKG_VERSION");
754        let git_hash = option_env!("BOUNDLESS_GIT_HASH").unwrap_or("unknown");
755        let signer = LocalSigner::random();
756        let nonce = Nonce { nonce: "TEST_NONCE".to_string() };
757        let origin = "http://localhost:8585".parse().unwrap();
758        let auth_msg = AuthMsg::new(nonce.clone(), &origin, &signer).await.unwrap();
759        let version_info = auth_msg.version_info();
760        println!("VersionInfo: {}", version_info);
761        assert!(version_info.version == version);
762        assert!(version_info.git_hash == git_hash);
763        auth_msg.verify("localhost:8585", &nonce.nonce).await.unwrap();
764    }
765
766    #[tokio::test]
767    #[should_panic(expected = "Message domain does not match")]
768    async fn auth_msg_bad_origin() {
769        let signer = LocalSigner::random();
770        let nonce = Nonce { nonce: "TEST_NONCE".to_string() };
771        let origin = "http://localhost:8585".parse().unwrap();
772        let auth_msg = AuthMsg::new(nonce.clone(), &origin, &signer).await.unwrap();
773        auth_msg.verify("boundless.xyz", &nonce.nonce).await.unwrap();
774    }
775
776    #[tokio::test]
777    #[should_panic(expected = "Message nonce does not match")]
778    async fn auth_msg_bad_nonce() {
779        let signer = LocalSigner::random();
780        let nonce = Nonce { nonce: "TEST_NONCE".to_string() };
781        let origin = "http://localhost:8585".parse().unwrap();
782        let auth_msg = AuthMsg::new(nonce.clone(), &origin, &signer).await.unwrap();
783        auth_msg.verify("localhost:8585", "BAD_NONCE").await.unwrap();
784    }
785
786    #[tokio::test]
787    #[tracing_test::traced_test]
788    async fn order_stream_ends_on_close_with_reason() {
789        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
790        let addr = listener.local_addr().unwrap();
791
792        let server_task = tokio::spawn(async move {
793            let (stream, _) = listener.accept().await.unwrap();
794            let mut ws = accept_async(stream).await.unwrap();
795            let close_frame =
796                CloseFrame { code: CloseCode::Normal, reason: Cow::Borrowed("test reason") };
797            ws.send(tungstenite::Message::Close(Some(close_frame))).await.unwrap();
798        });
799
800        let url = format!("ws://{}/ws", addr);
801        let (socket, _) = connect_async(url).await.unwrap();
802        let mut stream = order_stream(socket);
803
804        let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await.unwrap();
805        assert!(next.is_none());
806        assert!(logs_contain("Server closed the order stream connection with reason:"));
807        assert!(logs_contain("test reason"));
808
809        server_task.await.unwrap();
810    }
811}