1use std::{fmt::Display, pin::Pin};
16
17use alloy::{
18 primitives::{Address, Signature, U256},
19 signers::{Error as SignerErr, Signer},
20};
21use alloy_primitives::B256;
22use anyhow::{Context, Result};
23use async_stream::stream;
24use chrono::{DateTime, Utc};
25use futures_util::{SinkExt, Stream, StreamExt};
26use reqwest::Url;
27use serde::{Deserialize, Serialize};
28use siwe::Message as SiweMsg;
29use thiserror::Error;
30use time::OffsetDateTime;
31use tokio::net::TcpStream;
32use tokio_tungstenite::{
33 connect_async, tungstenite, tungstenite::client::IntoClientRequest, MaybeTlsStream,
34 WebSocketStream,
35};
36use utoipa::ToSchema;
37
38use crate::contracts::{ProofRequest, RequestError};
39
40pub const ORDER_SUBMISSION_PATH: &str = "/api/v1/submit_order";
42pub const ORDER_LIST_PATH: &str = "/api/v1/orders";
44pub const ORDER_LIST_PATH_V2: &str = "/api/v2/orders";
46pub const AUTH_GET_NONCE: &str = "/api/v1/nonce/";
48pub const HEALTH_CHECK: &str = "/api/v1/health";
50pub const ORDER_WS_PATH: &str = "/ws/v1/orders";
52
53#[derive(Debug, Deserialize, Serialize, ToSchema)]
55pub struct ErrMsg {
56 pub r#type: String,
58 pub msg: String,
60}
61impl ErrMsg {
62 pub fn new(r#type: &str, msg: &str) -> Self {
64 Self { r#type: r#type.into(), msg: msg.into() }
65 }
66}
67impl std::fmt::Display for ErrMsg {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 write!(f, "error_type: {} msg: {}", self.r#type, self.msg)
70 }
71}
72
73#[derive(Error, Debug)]
75#[non_exhaustive]
76pub enum OrderError {
77 #[error("invalid signature: {0}")]
78 InvalidSignature(SignerErr),
80 #[error("request error: {0}")]
81 RequestError(#[from] RequestError),
83}
84
85#[derive(Serialize, Deserialize, ToSchema, Debug, Clone, PartialEq)]
89pub struct Order {
90 #[schema(value_type = Object)]
92 pub request: ProofRequest,
93 #[schema(value_type = Object)]
95 pub request_digest: B256,
96 #[schema(value_type = Object)]
99 pub signature: Signature,
100}
101
102#[derive(Serialize, Deserialize, ToSchema, Debug, Clone)]
104pub struct OrderData {
105 pub id: i64,
107 pub order: Order,
109 #[schema(value_type = String)]
111 pub created_at: DateTime<Utc>,
112}
113
114#[derive(Serialize, Deserialize, ToSchema, Debug, Clone)]
116pub struct Nonce {
117 pub nonce: String,
119}
120
121#[derive(Serialize, Deserialize, ToSchema, Debug, Clone)]
123pub struct SubmitOrderRes {
124 pub status: String,
126 #[schema(value_type = Object)]
128 pub request_id: U256,
129}
130
131#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
133#[serde(rename_all = "lowercase")]
134pub enum SortDirection {
135 Asc,
137 Desc,
139}
140
141#[derive(Serialize, Deserialize, ToSchema, Debug, Clone)]
143pub struct ListOrdersV2Response {
144 pub orders: Vec<OrderData>,
146 pub next_cursor: Option<String>,
148 pub has_more: bool,
150}
151
152impl Order {
153 pub fn new(request: ProofRequest, request_digest: B256, signature: Signature) -> Self {
155 Self { request, request_digest, signature }
156 }
157
158 pub fn validate(&self, market_address: Address, chain_id: u64) -> Result<(), OrderError> {
160 self.request.validate()?;
161 let hash = self.request.signing_hash(market_address, chain_id)?;
162 if hash != self.request_digest {
163 return Err(OrderError::RequestError(RequestError::DigestMismatch));
164 }
165 self.request.verify_signature(
166 &self.signature.as_bytes().into(),
167 market_address,
168 chain_id,
169 )?;
170 Ok(())
171 }
172}
173
174#[derive(Deserialize, Serialize, ToSchema, Debug, Clone)]
176pub struct AuthMsg {
177 #[schema(value_type = Object)]
179 message: SiweMsg,
180 #[schema(value_type = Object)]
182 signature: Signature,
183}
184
185#[derive(Deserialize, Serialize, ToSchema, Debug, Clone)]
187pub struct VersionInfo {
188 pub version: String,
190 pub git_hash: String,
192}
193
194impl Display for VersionInfo {
195 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 write!(f, "Boundless Order Stream\nVersion={}\nGit hash={}", self.version, self.git_hash)
197 }
198}
199
200impl From<&SiweMsg> for VersionInfo {
201 fn from(msg: &SiweMsg) -> Self {
202 let mut version = "unknown".to_string();
203 let mut git_hash = "unknown".to_string();
204 if let Some(statement) = &msg.statement {
205 let parts: Vec<&str> = statement.split(':').collect();
206 if parts.len() == 3 && parts[0] == "Boundless Order Stream" {
207 version = parts[1].to_string();
208 git_hash = parts[2].to_string();
209 }
210 }
211 Self { version, git_hash }
212 }
213}
214
215impl From<&AuthMsg> for VersionInfo {
216 fn from(auth_msg: &AuthMsg) -> Self {
217 VersionInfo::from(&auth_msg.message)
218 }
219}
220
221impl AuthMsg {
222 pub async fn new(nonce: Nonce, origin: &Url, signer: &impl Signer) -> Result<Self> {
224 let version = env!("CARGO_PKG_VERSION");
225 let git_hash = option_env!("BOUNDLESS_GIT_HASH").unwrap_or("unknown");
226 let message = format!(
227 "{} wants you to sign in with your Ethereum account:\n{}\n\nBoundless Order Stream:{version}:{git_hash}\n\nURI: {}\nVersion: 1\nChain ID: 1\nNonce: {}\nIssued At: {}",
228 origin.authority(), signer.address(), origin, nonce.nonce, Utc::now().to_rfc3339(),
229 );
230 let message: SiweMsg = message.parse()?;
231
232 let signature = signer
233 .sign_hash(&message.eip191_hash().context("Failed to generate eip191 hash")?.into())
234 .await?;
235
236 Ok(Self { message, signature })
237 }
238
239 pub async fn verify(&self, domain: &str, nonce: &str) -> Result<()> {
241 let opts = siwe::VerificationOpts {
242 domain: Some(domain.parse().context("Invalid domain")?),
243 nonce: Some(nonce.into()),
244 timestamp: Some(OffsetDateTime::now_utc()),
245 };
246
247 self.message
248 .verify(&self.signature.as_bytes(), &opts)
249 .await
250 .context("Failed to verify SIWE message")
251 }
252
253 pub fn address(&self) -> Address {
255 Address::from(self.message.address)
256 }
257
258 pub fn version_info(&self) -> VersionInfo {
260 VersionInfo::from(self)
261 }
262}
263
264#[derive(Clone, Debug)]
266pub struct OrderStreamClient {
267 pub client: reqwest::Client,
269 pub base_url: Url,
271 pub boundless_market_address: Address,
273 pub chain_id: u64,
275 pub api_key: Option<String>,
277}
278
279impl OrderStreamClient {
280 pub fn new(base_url: Url, boundless_market_address: Address, chain_id: u64) -> Self {
282 Self {
283 client: reqwest::Client::new(),
284 base_url,
285 boundless_market_address,
286 chain_id,
287 api_key: None,
288 }
289 }
290
291 pub fn new_with_api_key(
293 base_url: Url,
294 boundless_market_address: Address,
295 chain_id: u64,
296 api_key: String,
297 ) -> Self {
298 Self {
299 client: reqwest::Client::new(),
300 base_url,
301 boundless_market_address,
302 chain_id,
303 api_key: Some(api_key),
304 }
305 }
306
307 fn add_api_key_header(&self, request: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
309 if let Some(api_key) = &self.api_key {
310 request.header("x-api-key", api_key)
311 } else {
312 request
313 }
314 }
315
316 pub async fn submit_request(
318 &self,
319 request: &ProofRequest,
320 signer: &impl Signer,
321 ) -> Result<Order> {
322 let url = self.base_url.join(ORDER_SUBMISSION_PATH)?;
323 let signature =
324 request.sign_request(signer, self.boundless_market_address, self.chain_id).await?;
325 let request_digest = request.signing_hash(self.boundless_market_address, self.chain_id)?;
326 let order = Order { request: request.clone(), request_digest, signature };
327 order.validate(self.boundless_market_address, self.chain_id)?;
328 let order_json = serde_json::to_value(&order)?;
329 let response = self
330 .add_api_key_header(
331 self.client.post(url).header("Content-Type", "application/json").json(&order_json),
332 )
333 .send()
334 .await?;
335
336 if let Err(err) = response.error_for_status_ref() {
338 let error_message = match response.json::<serde_json::Value>().await {
339 Ok(json_body) => {
340 json_body["msg"].as_str().unwrap_or("Unknown server error").to_string()
341 }
342 Err(_) => "Failed to read server error message".to_string(),
343 };
344
345 return Err(anyhow::Error::new(err).context(error_message));
346 }
347
348 Ok(order)
349 }
350
351 pub async fn fetch_order(&self, id: U256, request_digest: Option<B256>) -> Result<Order> {
355 let url = self.base_url.join(&format!("{ORDER_LIST_PATH}/{id}"))?;
356 let response = self.add_api_key_header(self.client.get(url)).send().await?;
357
358 if !response.status().is_success() {
359 let error_message = match response.json::<serde_json::Value>().await {
360 Ok(json_body) => {
361 json_body["msg"].as_str().unwrap_or("Unknown server error").to_string()
362 }
363 Err(_) => "Failed to read server error message".to_string(),
364 };
365
366 return Err(anyhow::Error::msg(error_message));
367 }
368
369 let order_data: Vec<OrderData> = response.json().await?;
370 let orders: Vec<Order> = order_data.into_iter().map(|data| data.order).collect();
371 if orders.is_empty() {
372 return Err(anyhow::Error::msg("No order found"));
373 } else if orders.len() == 1 {
374 return Ok(orders[0].clone());
375 }
376 match request_digest {
377 Some(digest) => {
378 for order in orders {
379 if order.request_digest == digest {
380 return Ok(order);
381 }
382 }
383 Err(anyhow::Error::msg("No order found"))
384 }
385 None => {
386 Err(anyhow::Error::msg("Multiple orders found, please provide a request digest"))
387 }
388 }
389 }
390
391 pub async fn fetch_order_with_timestamp(
396 &self,
397 id: U256,
398 request_digest: Option<B256>,
399 ) -> Result<(Order, DateTime<Utc>)> {
400 let url = self.base_url.join(&format!("{ORDER_LIST_PATH}/{id}"))?;
401 let response = self.add_api_key_header(self.client.get(url)).send().await?;
402
403 if !response.status().is_success() {
404 let error_message = match response.json::<serde_json::Value>().await {
405 Ok(json_body) => {
406 json_body["msg"].as_str().unwrap_or("Unknown server error").to_string()
407 }
408 Err(_) => "Failed to read server error message".to_string(),
409 };
410
411 return Err(anyhow::Error::msg(error_message));
412 }
413
414 let order_data: Vec<OrderData> = response.json().await?;
415 if order_data.is_empty() {
416 return Err(anyhow::Error::msg("No order found"));
417 } else if order_data.len() == 1 {
418 let data = &order_data[0];
419 return Ok((data.order.clone(), data.created_at));
420 }
421 match request_digest {
422 Some(digest) => {
423 for data in order_data {
424 if data.order.request_digest == digest {
425 return Ok((data.order, data.created_at));
426 }
427 }
428 Err(anyhow::Error::msg("No order found"))
429 }
430 None => {
431 Err(anyhow::Error::msg("Multiple orders found, please provide a request digest"))
432 }
433 }
434 }
435
436 pub async fn get_nonce(&self, address: Address) -> Result<Nonce> {
438 let url = self.base_url.join(AUTH_GET_NONCE)?.join(&address.to_string())?;
439 let res = self.add_api_key_header(self.client.get(url)).send().await?;
440 if !res.status().is_success() {
441 anyhow::bail!("Http error {} fetching nonce", res.status())
442 }
443 let nonce = res.json().await?;
444
445 Ok(nonce)
446 }
447
448 pub async fn list_orders_by_creation(
453 &self,
454 after: Option<DateTime<Utc>>,
455 limit: u64,
456 ) -> Result<Vec<OrderData>> {
457 let mut url = self.base_url.join(ORDER_LIST_PATH)?;
458
459 {
460 let mut query = url.query_pairs_mut();
461 query.append_pair("sort", "desc");
462 query.append_pair("limit", &limit.to_string());
463 if let Some(ts) = after {
464 query.append_pair("after", &ts.to_rfc3339());
465 }
466 }
467
468 let response = self.add_api_key_header(self.client.get(url)).send().await?;
469
470 if !response.status().is_success() {
471 let error_message = match response.json::<serde_json::Value>().await {
472 Ok(json_body) => {
473 json_body["msg"].as_str().unwrap_or("Unknown server error").to_string()
474 }
475 Err(_) => "Failed to read server error message".to_string(),
476 };
477
478 return Err(anyhow::Error::msg(error_message));
479 }
480
481 let orders: Vec<OrderData> = response.json().await?;
482 Ok(orders)
483 }
484
485 pub async fn list_orders_v2(
505 &self,
506 cursor: Option<String>,
507 limit: Option<u64>,
508 sort: Option<SortDirection>,
509 before: Option<DateTime<Utc>>,
510 after: Option<DateTime<Utc>>,
511 ) -> Result<ListOrdersV2Response> {
512 let mut url = self.base_url.join(ORDER_LIST_PATH_V2)?;
513
514 {
515 let mut query = url.query_pairs_mut();
516 if let Some(cursor_str) = cursor {
517 query.append_pair("cursor", &cursor_str);
518 }
519 if let Some(limit_val) = limit {
520 query.append_pair("limit", &limit_val.to_string());
521 }
522 if let Some(sort_val) = sort {
523 let sort_str = match sort_val {
524 SortDirection::Asc => "asc",
525 SortDirection::Desc => "desc",
526 };
527 query.append_pair("sort", sort_str);
528 }
529 if let Some(ts) = before {
530 query.append_pair("before", &ts.to_rfc3339());
531 }
532 if let Some(ts) = after {
533 query.append_pair("after", &ts.to_rfc3339());
534 }
535 }
536
537 let response = self.add_api_key_header(self.client.get(url)).send().await?;
538
539 if !response.status().is_success() {
540 let error_message = match response.json::<serde_json::Value>().await {
541 Ok(json_body) => {
542 json_body["msg"].as_str().unwrap_or("Unknown server error").to_string()
543 }
544 Err(_) => "Failed to read server error message".to_string(),
545 };
546
547 return Err(anyhow::Error::msg(error_message));
548 }
549
550 let response_data: ListOrdersV2Response = response.json().await?;
551 Ok(response_data)
552 }
553
554 pub async fn connect_async(
561 &self,
562 signer: &impl Signer,
563 ) -> Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
564 let nonce = self
565 .get_nonce(signer.address())
566 .await
567 .context("Failed to fetch nonce from order-stream")?;
568
569 let auth_msg = AuthMsg::new(nonce, &self.base_url, signer).await?;
570
571 let auth_json =
573 serde_json::to_string(&auth_msg).context("failed to serialize auth message")?;
574
575 let host = self.base_url.host().context("missing host")?.to_string();
577 let ws_scheme = if self.base_url.scheme() == "https" { "wss" } else { "ws" };
579
580 let ws_url = match self.base_url.port() {
581 Some(port) => format!("{ws_scheme}://{host}:{port}{ORDER_WS_PATH}"),
582 None => format!("{ws_scheme}://{host}{ORDER_WS_PATH}"),
583 };
584
585 let mut request =
587 ws_url.clone().into_client_request().context("failed to create request")?;
588 request
589 .headers_mut()
590 .insert("X-Auth-Data", auth_json.parse().context("failed to parse auth message")?);
591
592 if let Some(api_key) = &self.api_key {
594 request
595 .headers_mut()
596 .insert("x-api-key", api_key.parse().context("failed to parse api key")?);
597 }
598
599 let (socket, _) = match connect_async(request).await {
601 Ok(res) => res,
602 Err(tokio_tungstenite::tungstenite::Error::Http(err)) => {
603 let http_err = if let Some(http_body) = err.body() {
604 String::from_utf8_lossy(http_body)
605 } else {
606 "Empty http error body".into()
607 };
608 anyhow::bail!(
609 "Failed to connect to ws endpoint ({}): {} {}",
610 ws_url,
611 self.base_url,
612 http_err
613 );
614 }
615 Err(err) => {
616 anyhow::bail!(
617 "Failed to connect to ws endpoint ({}): {} {}",
618 ws_url,
619 self.base_url,
620 err
621 );
622 }
623 };
624 Ok(socket)
625 }
626}
627
628#[allow(clippy::type_complexity)]
645pub fn order_stream(
646 mut socket: WebSocketStream<MaybeTlsStream<TcpStream>>,
647) -> Pin<Box<dyn Stream<Item = OrderData> + Send>> {
648 Box::pin(stream! {
649 let ping_duration = match std::env::var("ORDER_STREAM_CLIENT_PING_MS") {
651 Ok(ms) => match ms.parse::<u64>() {
652 Ok(ms) => {
653 tracing::debug!("Using custom ping interval of {}ms", ms);
654 tokio::time::Duration::from_millis(ms)
655 },
656 Err(_) => {
657 tracing::warn!("Invalid ORDER_STREAM_CLIENT_PING_MS value: {}, using default", ms);
658 tokio::time::Duration::from_secs(30)
659 }
660 },
661 Err(_) => tokio::time::Duration::from_secs(30),
662 };
663
664 let mut ping_interval = tokio::time::interval(ping_duration);
665 let mut ping_data: Option<Vec<u8>> = None;
667
668 loop {
669 tokio::select! {
670 msg_result = socket.next() => {
672 match msg_result {
673 Some(Ok(tungstenite::Message::Text(msg))) => {
674 match serde_json::from_str::<OrderData>(&msg) {
675 Ok(order) => yield order,
676 Err(err) => {
677 tracing::warn!("Failed to parse order: {:?}", err);
678 continue;
679 }
680 }
681 }
682 Some(Ok(tungstenite::Message::Ping(data))) => {
684 tracing::trace!("Responding to ping");
685 if let Err(err) = socket.send(tungstenite::Message::Pong(data)).await {
686 tracing::warn!("Failed to send pong: {:?}", err);
687 break;
688 }
689 }
690 Some(Ok(tungstenite::Message::Pong(data))) => {
692 tracing::trace!("Received pong from server");
693 if let Some(expected_data) = ping_data.take() {
694 if data != expected_data {
695 tracing::warn!("Server responded with invalid pong data");
696 break;
697 }
698 } else {
699 tracing::warn!("Received unexpected pong from order-stream server");
700 }
701 }
702 Some(Ok(tungstenite::Message::Close(msg))) => {
703 tracing::warn!("Server closed the order stream connection with reason: {:?}", msg.map(|m| m.to_string()));
704 break;
705 }
706 Some(Ok(other)) => {
707 tracing::debug!("Ignoring non-text message: {:?}", other);
708 continue;
709 }
710 Some(Err(err)) => {
711 tracing::warn!("order stream socket error: {:?}", err);
712 break;
713 }
714 None => {
715 tracing::warn!("order stream socket closed unexpectedly");
716 break;
717 }
718 }
719 }
720 _ = ping_interval.tick() => {
722 if ping_data.is_some() {
724 tracing::warn!("Server did not respond to ping, closing connection");
725 break;
726 }
727
728 tracing::trace!("Sending ping to server");
729 let random_bytes: Vec<u8> = (0..16).map(|_| rand::random::<u8>()).collect();
730 if let Err(err) = socket.send(tungstenite::Message::Ping(random_bytes.clone())).await {
731 tracing::warn!("Failed to send ping: {:?}", err);
732 break;
733 }
734 ping_data = Some(random_bytes);
735 }
736 }
737 }
738 })
739}
740
741#[cfg(test)]
742mod tests {
743 use super::*;
744 use alloy::signers::local::LocalSigner;
745 use std::borrow::Cow;
746 use std::time::Duration;
747 use tokio::net::TcpListener;
748 use tokio_tungstenite::accept_async;
749 use tungstenite::protocol::{frame::coding::CloseCode, CloseFrame};
750
751 #[tokio::test]
752 async fn auth_msg_verify() {
753 let version = env!("CARGO_PKG_VERSION");
754 let git_hash = option_env!("BOUNDLESS_GIT_HASH").unwrap_or("unknown");
755 let signer = LocalSigner::random();
756 let nonce = Nonce { nonce: "TEST_NONCE".to_string() };
757 let origin = "http://localhost:8585".parse().unwrap();
758 let auth_msg = AuthMsg::new(nonce.clone(), &origin, &signer).await.unwrap();
759 let version_info = auth_msg.version_info();
760 println!("VersionInfo: {}", version_info);
761 assert!(version_info.version == version);
762 assert!(version_info.git_hash == git_hash);
763 auth_msg.verify("localhost:8585", &nonce.nonce).await.unwrap();
764 }
765
766 #[tokio::test]
767 #[should_panic(expected = "Message domain does not match")]
768 async fn auth_msg_bad_origin() {
769 let signer = LocalSigner::random();
770 let nonce = Nonce { nonce: "TEST_NONCE".to_string() };
771 let origin = "http://localhost:8585".parse().unwrap();
772 let auth_msg = AuthMsg::new(nonce.clone(), &origin, &signer).await.unwrap();
773 auth_msg.verify("boundless.xyz", &nonce.nonce).await.unwrap();
774 }
775
776 #[tokio::test]
777 #[should_panic(expected = "Message nonce does not match")]
778 async fn auth_msg_bad_nonce() {
779 let signer = LocalSigner::random();
780 let nonce = Nonce { nonce: "TEST_NONCE".to_string() };
781 let origin = "http://localhost:8585".parse().unwrap();
782 let auth_msg = AuthMsg::new(nonce.clone(), &origin, &signer).await.unwrap();
783 auth_msg.verify("localhost:8585", "BAD_NONCE").await.unwrap();
784 }
785
786 #[tokio::test]
787 #[tracing_test::traced_test]
788 async fn order_stream_ends_on_close_with_reason() {
789 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
790 let addr = listener.local_addr().unwrap();
791
792 let server_task = tokio::spawn(async move {
793 let (stream, _) = listener.accept().await.unwrap();
794 let mut ws = accept_async(stream).await.unwrap();
795 let close_frame =
796 CloseFrame { code: CloseCode::Normal, reason: Cow::Borrowed("test reason") };
797 ws.send(tungstenite::Message::Close(Some(close_frame))).await.unwrap();
798 });
799
800 let url = format!("ws://{}/ws", addr);
801 let (socket, _) = connect_async(url).await.unwrap();
802 let mut stream = order_stream(socket);
803
804 let next = tokio::time::timeout(Duration::from_secs(2), stream.next()).await.unwrap();
805 assert!(next.is_none());
806 assert!(logs_contain("Server closed the order stream connection with reason:"));
807 assert!(logs_contain("test reason"));
808
809 server_task.await.unwrap();
810 }
811}