1use std::sync::Arc;
24use std::time::Duration;
25
26use rmcp::model::{Annotated, RawResource, RawResourceTemplate, Resource, ResourceTemplate};
27
28use crate::context::AdapterContext;
29use crate::error::AdapterError;
30
31pub mod live;
32pub mod static_;
33
34use live::{BookSnapshot, LiveRegistry, SubscriptionProvider, TickerSnapshot, TradeUpdate};
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub enum ResourceUri {
45 Currencies,
47 Instruments {
50 currency: String,
52 },
53 Book {
55 instrument: String,
57 },
58 Ticker {
60 instrument: String,
62 },
63 Trades {
65 instrument: String,
67 },
68}
69
70impl ResourceUri {
71 #[must_use]
73 pub fn to_uri(&self) -> String {
74 match self {
75 Self::Currencies => "deribit://currencies".to_string(),
76 Self::Instruments { currency } => format!("deribit://instruments/{currency}"),
77 Self::Book { instrument } => format!("deribit://book/{instrument}"),
78 Self::Ticker { instrument } => format!("deribit://ticker/{instrument}"),
79 Self::Trades { instrument } => format!("deribit://trades/{instrument}"),
80 }
81 }
82}
83
84const SCHEME: &str = "deribit://";
86
87pub fn parse_resource_uri(s: &str) -> Result<ResourceUri, AdapterError> {
94 let rest = s
95 .strip_prefix(SCHEME)
96 .ok_or_else(|| AdapterError::validation("uri", format!("not a `{SCHEME}` URI: {s}")))?;
97 if rest.is_empty() {
98 return Err(AdapterError::validation("uri", "empty resource path"));
99 }
100 let mut segments = rest.splitn(2, '/');
101 let head = segments
102 .next()
103 .ok_or_else(|| AdapterError::validation("uri", "missing resource head"))?;
104 let tail = segments.next().filter(|s| !s.is_empty());
108
109 match (head, tail) {
110 ("currencies", None) => Ok(ResourceUri::Currencies),
111 ("currencies", Some(_)) => Err(AdapterError::validation(
112 "uri",
113 "`deribit://currencies` takes no path",
114 )),
115 ("instruments", Some(currency)) => {
116 let currency = parse_currency(currency)?;
117 Ok(ResourceUri::Instruments { currency })
118 }
119 ("instruments", None) => Err(AdapterError::validation(
120 "uri",
121 "`deribit://instruments/{currency}` requires a currency",
122 )),
123 ("book", Some(instrument)) => {
124 let instrument = parse_instrument_name(instrument)?;
125 Ok(ResourceUri::Book { instrument })
126 }
127 ("ticker", Some(instrument)) => {
128 let instrument = parse_instrument_name(instrument)?;
129 Ok(ResourceUri::Ticker { instrument })
130 }
131 ("trades", Some(instrument)) => {
132 let instrument = parse_instrument_name(instrument)?;
133 Ok(ResourceUri::Trades { instrument })
134 }
135 ("book" | "ticker" | "trades", None) => Err(AdapterError::validation(
136 "uri",
137 format!("`deribit://{head}/{{instrument}}` requires an instrument"),
138 )),
139 (other, _) => Err(AdapterError::validation(
140 "uri",
141 format!("unknown resource head: `{other}`"),
142 )),
143 }
144}
145
146fn parse_currency(s: &str) -> Result<String, AdapterError> {
153 if s.is_empty() || s.len() > 8 {
154 return Err(AdapterError::validation(
155 "currency",
156 format!("expected 1..=8 chars, got {} for `{s}`", s.len()),
157 ));
158 }
159 if !s
160 .chars()
161 .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
162 {
163 return Err(AdapterError::validation(
164 "currency",
165 format!("expected `[A-Z0-9_]`, got `{s}`"),
166 ));
167 }
168 Ok(s.to_string())
169}
170
171fn parse_instrument_name(s: &str) -> Result<String, AdapterError> {
178 if s.is_empty() || s.len() > 64 {
179 return Err(AdapterError::validation(
180 "instrument",
181 format!("expected 1..=64 chars, got {} for `{s}`", s.len()),
182 ));
183 }
184 if !s
185 .chars()
186 .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '-' || c == '_')
187 {
188 return Err(AdapterError::validation(
189 "instrument",
190 format!("expected `[A-Z0-9_-]`, got `{s}`"),
191 ));
192 }
193 Ok(s.to_string())
194}
195
196#[derive(Debug, Default, Clone)]
199pub struct ResourceList {
200 pub resources: Vec<Resource>,
202 pub templates: Vec<ResourceTemplate>,
204}
205
206#[derive(Debug, Clone, PartialEq)]
212pub enum ResourceContent {
213 Json(serde_json::Value),
216}
217
218#[derive(Clone)]
220pub struct ResourceRegistry {
221 list: ResourceList,
222 live: LiveRegistry,
223 provider: Option<Arc<dyn SubscriptionProvider>>,
228}
229
230impl std::fmt::Debug for ResourceRegistry {
231 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 f.debug_struct("ResourceRegistry")
233 .field("list", &self.list)
234 .field("live", &self.live)
235 .field(
236 "provider",
237 &self.provider.as_ref().map(|_| "<dyn SubscriptionProvider>"),
238 )
239 .finish()
240 }
241}
242
243impl Default for ResourceRegistry {
244 fn default() -> Self {
245 Self {
246 list: ResourceList::default(),
247 live: LiveRegistry::new(),
248 provider: None,
249 }
250 }
251}
252
253const FIRST_FRAME_TIMEOUT: Duration = Duration::from_secs(5);
257
258impl ResourceRegistry {
259 #[must_use]
261 pub fn new() -> Self {
262 Self::default()
263 }
264
265 #[must_use]
270 pub fn with_subscription_provider(mut self, provider: Arc<dyn SubscriptionProvider>) -> Self {
271 self.provider = Some(provider);
272 self
273 }
274
275 #[must_use]
290 pub fn build() -> Self {
291 let mut list = ResourceList::default();
292 list.resources.push(make_resource(
293 "deribit://currencies",
294 "Deribit currency catalogue",
295 "Static list of Deribit currency symbols and metadata.",
296 ));
297 list.templates.push(make_template(
298 "deribit://instruments/{currency}",
299 "Deribit instruments by currency",
300 "Static list of instruments for a given currency.",
301 ));
302 list.templates.push(make_template(
303 "deribit://book/{instrument}",
304 "Deribit order book (live)",
305 "Order book snapshots from the `book.<instrument>.raw` \
306 channel. Read returns the latest decoded BookSnapshot \
307 when a SubscriptionProvider is configured; otherwise \
308 AdapterError::Internal.",
309 ));
310 list.templates.push(make_template(
311 "deribit://ticker/{instrument}",
312 "Deribit ticker (live)",
313 "Throttled ticker snapshots from the \
314 `ticker.<instrument>.100ms` channel. Read returns the \
315 latest TickerSnapshot when a SubscriptionProvider is \
316 configured; otherwise AdapterError::Internal.",
317 ));
318 list.templates.push(make_template(
319 "deribit://trades/{instrument}",
320 "Deribit last trades (live)",
321 "Trade events from the `trades.<instrument>.raw` channel. \
322 Read returns the most recent N TradeUpdate values \
323 (newest first) when a SubscriptionProvider is configured; \
324 otherwise AdapterError::Internal.",
325 ));
326 Self {
327 list,
328 live: LiveRegistry::new(),
329 provider: None,
330 }
331 }
332
333 #[must_use]
335 pub fn resources(&self) -> Vec<Resource> {
336 self.list.resources.clone()
337 }
338
339 #[must_use]
341 pub fn templates(&self) -> Vec<ResourceTemplate> {
342 self.list.templates.clone()
343 }
344
345 #[must_use]
347 pub fn list(&self) -> ResourceList {
348 self.list.clone()
349 }
350
351 #[must_use]
353 pub fn is_empty(&self) -> bool {
354 self.list.resources.is_empty() && self.list.templates.is_empty()
355 }
356
357 pub async fn read(
372 &self,
373 ctx: &AdapterContext,
374 uri: &ResourceUri,
375 ) -> Result<ResourceContent, AdapterError> {
376 match uri {
377 ResourceUri::Currencies => {
378 Ok(ResourceContent::Json(static_::read_currencies(ctx).await?))
379 }
380 ResourceUri::Instruments { currency } => Ok(ResourceContent::Json(
381 static_::read_instruments(ctx, currency).await?,
382 )),
383 ResourceUri::Book { instrument } => {
384 let value = self.read_live(uri).await?;
385 let book = BookSnapshot::from_value(instrument, &value)?;
386 Ok(ResourceContent::Json(serde_json::to_value(&book)?))
387 }
388 ResourceUri::Ticker { instrument } => {
389 let value = self.read_live(uri).await?;
390 let ticker = TickerSnapshot::from_value(instrument, &value)?;
391 Ok(ResourceContent::Json(serde_json::to_value(&ticker)?))
392 }
393 ResourceUri::Trades { .. } => {
394 use tokio::sync::broadcast::error::RecvError;
402 let provider = self.provider.as_ref().ok_or_else(|| {
403 AdapterError::internal("live subscription provider not configured")
404 })?;
405 let handle = self.live.subscribe(provider.as_ref(), uri).await?;
406 let mut updates = handle.updates();
407 if handle.latest().await.is_none() {
408 match tokio::time::timeout(FIRST_FRAME_TIMEOUT, updates.recv()).await {
409 Ok(Ok(())) | Ok(Err(RecvError::Lagged(_))) => {}
410 Ok(Err(RecvError::Closed)) => {
411 return Err(AdapterError::internal(
412 "live subscription closed before producing a frame",
413 ));
414 }
415 Err(_elapsed) => {
416 return Err(AdapterError::internal(
417 "live subscription did not produce a frame in time",
418 ));
419 }
420 }
421 }
422 let mut trades: Vec<TradeUpdate> = Vec::new();
423 for frame in handle.history().await {
424 let mut decoded = TradeUpdate::batch_from_value(&frame)?;
428 trades.append(&mut decoded);
429 }
430 trades.sort_by_key(|t| std::cmp::Reverse(t.timestamp));
431 trades.truncate(live::HISTORY_CAPACITY);
432 Ok(ResourceContent::Json(serde_json::to_value(&trades)?))
433 }
434 }
435 }
436
437 async fn read_live(&self, uri: &ResourceUri) -> Result<serde_json::Value, AdapterError> {
444 use tokio::sync::broadcast::error::RecvError;
445
446 let provider = self
447 .provider
448 .as_ref()
449 .ok_or_else(|| AdapterError::internal("live subscription provider not configured"))?;
450 let handle = self.live.subscribe(provider.as_ref(), uri).await?;
451 let mut updates = handle.updates();
457 if let Some(snapshot) = handle.latest().await {
458 return Ok(snapshot);
459 }
460 match tokio::time::timeout(FIRST_FRAME_TIMEOUT, updates.recv()).await {
461 Ok(Ok(())) => handle
462 .latest()
463 .await
464 .ok_or_else(|| AdapterError::internal("update fired without snapshot")),
465 Ok(Err(RecvError::Lagged(_))) => handle
466 .latest()
467 .await
468 .ok_or_else(|| AdapterError::internal("broadcast lagged before first frame")),
469 Ok(Err(RecvError::Closed)) => Err(AdapterError::internal(
470 "live subscription closed before producing a frame",
471 )),
472 Err(_elapsed) => Err(AdapterError::internal(
473 "live subscription did not produce a frame in time",
474 )),
475 }
476 }
477}
478
479fn make_resource(uri: &'static str, name: &'static str, description: &'static str) -> Resource {
480 let raw = RawResource {
481 uri: uri.to_string(),
482 name: name.to_string(),
483 title: None,
484 description: Some(description.to_string()),
485 mime_type: Some("application/json".to_string()),
486 size: None,
487 icons: None,
488 meta: None,
489 };
490 Annotated {
491 raw,
492 annotations: None,
493 }
494}
495
496fn make_template(
497 template: &'static str,
498 name: &'static str,
499 description: &'static str,
500) -> ResourceTemplate {
501 let raw = RawResourceTemplate {
502 uri_template: template.to_string(),
503 name: name.to_string(),
504 title: None,
505 description: Some(description.to_string()),
506 mime_type: Some("application/json".to_string()),
507 icons: None,
508 };
509 Annotated {
510 raw,
511 annotations: None,
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use super::*;
518
519 #[test]
520 fn parses_currencies() {
521 assert_eq!(
522 parse_resource_uri("deribit://currencies").unwrap(),
523 ResourceUri::Currencies
524 );
525 }
526
527 #[test]
528 fn parses_instruments_with_currency() {
529 assert_eq!(
530 parse_resource_uri("deribit://instruments/BTC").unwrap(),
531 ResourceUri::Instruments {
532 currency: "BTC".to_string()
533 }
534 );
535 }
536
537 #[test]
538 fn parses_book_template() {
539 assert_eq!(
540 parse_resource_uri("deribit://book/BTC-PERPETUAL").unwrap(),
541 ResourceUri::Book {
542 instrument: "BTC-PERPETUAL".to_string()
543 }
544 );
545 }
546
547 #[test]
548 fn parses_ticker_and_trades() {
549 assert!(matches!(
550 parse_resource_uri("deribit://ticker/ETH-PERPETUAL").unwrap(),
551 ResourceUri::Ticker { .. }
552 ));
553 assert!(matches!(
554 parse_resource_uri("deribit://trades/BTC-31MAY24-50000-C").unwrap(),
555 ResourceUri::Trades { .. }
556 ));
557 }
558
559 #[test]
560 fn rejects_non_deribit_scheme() {
561 let err = parse_resource_uri("foo://bar").unwrap_err();
562 match err {
563 AdapterError::Validation { field, .. } => assert_eq!(field, "uri"),
564 other => panic!("unexpected: {other:?}"),
565 }
566 }
567
568 #[test]
569 fn rejects_currencies_with_path() {
570 let err = parse_resource_uri("deribit://currencies/extra").unwrap_err();
571 assert!(matches!(err, AdapterError::Validation { .. }));
572 }
573
574 #[test]
575 fn rejects_instruments_without_currency() {
576 let err = parse_resource_uri("deribit://instruments/").unwrap_err();
577 assert!(matches!(err, AdapterError::Validation { .. }));
578 }
579
580 #[test]
581 fn rejects_unknown_head() {
582 let err = parse_resource_uri("deribit://options/BTC").unwrap_err();
583 assert!(matches!(err, AdapterError::Validation { .. }));
584 }
585
586 #[test]
587 fn rejects_lowercase_currency() {
588 let err = parse_resource_uri("deribit://instruments/btc").unwrap_err();
589 match err {
590 AdapterError::Validation { field, .. } => assert_eq!(field, "currency"),
591 other => panic!("unexpected: {other:?}"),
592 }
593 }
594
595 #[test]
596 fn rejects_overlong_instrument() {
597 let long = "X".repeat(65);
598 let uri = format!("deribit://book/{long}");
599 let err = parse_resource_uri(&uri).unwrap_err();
600 match err {
601 AdapterError::Validation { field, .. } => assert_eq!(field, "instrument"),
602 other => panic!("unexpected: {other:?}"),
603 }
604 }
605
606 #[test]
607 fn round_trip_to_uri() {
608 for original in [
609 "deribit://currencies",
610 "deribit://instruments/BTC",
611 "deribit://book/BTC-PERPETUAL",
612 "deribit://ticker/ETH-PERPETUAL",
613 "deribit://trades/BTC-31MAY24-50000-C",
614 ] {
615 let parsed = parse_resource_uri(original).unwrap();
616 assert_eq!(parsed.to_uri(), original);
617 }
618 }
619
620 #[test]
621 fn registry_build_lists_static_currency_entry() {
622 let r = ResourceRegistry::build();
623 assert_eq!(r.resources().len(), 1);
624 assert_eq!(r.resources()[0].raw.uri, "deribit://currencies");
625 }
626
627 #[test]
628 fn registry_build_lists_four_templates() {
629 let r = ResourceRegistry::build();
630 let templates = r.templates();
631 assert_eq!(templates.len(), 4);
632 let uris: Vec<&str> = templates
633 .iter()
634 .map(|t| t.raw.uri_template.as_str())
635 .collect();
636 assert!(uris.contains(&"deribit://instruments/{currency}"));
637 assert!(uris.contains(&"deribit://book/{instrument}"));
638 assert!(uris.contains(&"deribit://ticker/{instrument}"));
639 assert!(uris.contains(&"deribit://trades/{instrument}"));
640 }
641
642 fn ctx() -> AdapterContext {
643 use crate::config::{Config, LogFormat, OrderTransport, Transport};
644 use std::net::SocketAddr;
645 use std::sync::Arc;
646 let cfg = Config {
647 endpoint: "https://test.deribit.com".to_string(),
648 client_id: None,
649 client_secret: None,
650 allow_trading: false,
651 max_order_usd: None,
652 transport: Transport::Stdio,
653 http_listen: SocketAddr::from(([127, 0, 0, 1], 8723)),
654 http_bearer_token: None,
655 log_format: LogFormat::Text,
656 order_transport: OrderTransport::Http,
657 };
658 AdapterContext::new(Arc::new(cfg)).expect("ctx")
659 }
660
661 #[tokio::test]
662 async fn read_live_book_uses_provider_and_returns_snapshot() {
663 use crate::resources::live::{SubscriptionProvider, SubscriptionStream};
664 use std::future::Future;
665 use std::pin::Pin;
666 use std::sync::Arc;
667 use std::sync::atomic::{AtomicU64, Ordering};
668
669 struct CountingProvider {
672 opened: Arc<AtomicU64>,
673 }
674 impl SubscriptionProvider for CountingProvider {
675 fn subscribe(
676 &self,
677 _uri: ResourceUri,
678 ) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>
679 {
680 self.opened.fetch_add(1, Ordering::AcqRel);
681 Box::pin(async move {
682 let frame = serde_json::json!({
683 "bids": [[50_000.0, 1.0], [49_999.0, 2.0]],
684 "asks": [[50_001.0, 1.5]],
685 "change_id": 42_u64,
686 "timestamp": 1_700_000_000_000_i64,
687 });
688 let stream = futures_util::stream::iter(vec![Ok::<_, AdapterError>(frame)]);
689 Ok(Box::pin(stream) as SubscriptionStream)
690 })
691 }
692 }
693
694 let opened = Arc::new(AtomicU64::new(0));
695 let provider = Arc::new(CountingProvider {
696 opened: opened.clone(),
697 });
698 let registry = ResourceRegistry::build().with_subscription_provider(provider);
699 let uri = ResourceUri::Book {
700 instrument: "BTC-PERPETUAL".to_string(),
701 };
702
703 let content = registry.read(&ctx(), &uri).await.expect("ok");
704 match content {
705 ResourceContent::Json(value) => {
706 assert_eq!(
707 value.get("instrument").and_then(|v| v.as_str()),
708 Some("BTC-PERPETUAL")
709 );
710 assert_eq!(value.get("change_id").and_then(|v| v.as_u64()), Some(42));
711 assert!(value.get("bids").and_then(|v| v.as_array()).is_some());
712 }
713 }
714
715 let _ = registry.read(&ctx(), &uri).await.expect("ok");
723 let opens = opened.load(Ordering::Acquire);
724 assert!(
725 (1..=2).contains(&opens),
726 "expected provider opens in 1..=2, got {opens}"
727 );
728 }
729
730 #[tokio::test]
731 async fn read_live_book_without_provider_returns_internal() {
732 let registry = ResourceRegistry::build();
733 let uri = ResourceUri::Book {
734 instrument: "BTC-PERPETUAL".to_string(),
735 };
736 let err = registry.read(&ctx(), &uri).await.unwrap_err();
737 assert!(matches!(err, AdapterError::Internal { .. }));
738 }
739
740 #[tokio::test]
741 async fn read_live_trades_uses_provider_and_returns_chronological_history() {
742 use crate::resources::live::{SubscriptionProvider, SubscriptionStream};
743 use std::future::Future;
744 use std::pin::Pin;
745 use std::sync::Arc;
746
747 struct StubProvider;
748 impl SubscriptionProvider for StubProvider {
749 fn subscribe(
750 &self,
751 _uri: ResourceUri,
752 ) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>
753 {
754 Box::pin(async move {
755 let frames = vec![
757 Ok::<_, AdapterError>(serde_json::json!([
758 {
759 "direction": "buy",
760 "price": 50_001.0,
761 "amount": 1.0,
762 "trade_id": "t1",
763 "timestamp": 1_700_000_000_001_i64
764 }
765 ])),
766 Ok::<_, AdapterError>(serde_json::json!([
767 {
768 "direction": "sell",
769 "price": 50_002.0,
770 "amount": 0.5,
771 "trade_id": "t2",
772 "timestamp": 1_700_000_000_002_i64,
773 "liquidation": "M",
774 "tick_direction": 1_i64,
775 "mark_price": 50_001.5,
776 "index_price": 50_010.0
777 }
778 ])),
779 ];
780 let stream = futures_util::stream::iter(frames);
781 Ok(Box::pin(stream) as SubscriptionStream)
782 })
783 }
784 }
785
786 let registry = ResourceRegistry::build().with_subscription_provider(Arc::new(StubProvider));
787 let uri = ResourceUri::Trades {
788 instrument: "BTC-PERPETUAL".to_string(),
789 };
790 let _ = registry.read(&ctx(), &uri).await.expect("first ok");
792 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
793 let content = registry.read(&ctx(), &uri).await.expect("second ok");
794 let ResourceContent::Json(value) = content;
795 let trades = value.as_array().expect("array");
796 let ids: Vec<&str> = trades
797 .iter()
798 .filter_map(|t| t.get("trade_id").and_then(|v| v.as_str()))
799 .collect();
800 assert_eq!(
801 ids,
802 vec!["t2", "t1"],
803 "expected newest-first ordering; got {ids:?}"
804 );
805 let t2 = &trades[0];
806 assert_eq!(
807 t2.get("liquidation").and_then(|v| v.as_str()),
808 Some("M"),
809 "t2 should carry the liquidation marker"
810 );
811 assert_eq!(t2.get("tick_direction").and_then(|v| v.as_i64()), Some(1));
812 assert_eq!(
813 t2.get("mark_price").and_then(|v| v.as_f64()),
814 Some(50_001.5)
815 );
816 assert_eq!(
817 t2.get("index_price").and_then(|v| v.as_f64()),
818 Some(50_010.0)
819 );
820 }
821
822 #[tokio::test]
823 async fn read_live_ticker_uses_provider_and_returns_snapshot_with_greeks() {
824 use crate::resources::live::{SubscriptionProvider, SubscriptionStream};
825 use std::future::Future;
826 use std::pin::Pin;
827 use std::sync::Arc;
828
829 struct StubProvider;
830 impl SubscriptionProvider for StubProvider {
831 fn subscribe(
832 &self,
833 _uri: ResourceUri,
834 ) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>
835 {
836 Box::pin(async move {
837 let frame = serde_json::json!({
838 "mark_price": 50_000.5,
839 "index_price": 50_010.0,
840 "best_bid_price": 50_000.0,
841 "best_ask_price": 50_001.0,
842 "last_price": 49_999.5,
843 "mark_iv": 65.0,
844 "greeks": {
845 "delta": 0.55,
846 "gamma": 0.0001,
847 "vega": 12.3,
848 "theta": -0.4,
849 "rho": 0.05
850 },
851 "timestamp": 1_700_000_000_000_i64,
852 });
853 let stream = futures_util::stream::iter(vec![Ok::<_, AdapterError>(frame)]);
854 Ok(Box::pin(stream) as SubscriptionStream)
855 })
856 }
857 }
858
859 let registry = ResourceRegistry::build().with_subscription_provider(Arc::new(StubProvider));
860 let uri = ResourceUri::Ticker {
861 instrument: "BTC-31MAY24-50000-C".to_string(),
862 };
863 let content = registry.read(&ctx(), &uri).await.expect("ok");
864 let ResourceContent::Json(value) = content;
865 assert_eq!(
866 value.get("instrument").and_then(|v| v.as_str()),
867 Some("BTC-31MAY24-50000-C")
868 );
869 assert_eq!(
870 value.get("mark_price").and_then(|v| v.as_f64()),
871 Some(50_000.5)
872 );
873 assert_eq!(value.get("delta").and_then(|v| v.as_f64()), Some(0.55));
874 assert_eq!(value.get("gamma").and_then(|v| v.as_f64()), Some(0.0001));
875 assert_eq!(value.get("vega").and_then(|v| v.as_f64()), Some(12.3));
876 }
877
878 #[tokio::test]
879 async fn read_live_ticker_perp_omits_greeks() {
880 use crate::resources::live::{SubscriptionProvider, SubscriptionStream};
881 use std::future::Future;
882 use std::pin::Pin;
883 use std::sync::Arc;
884
885 struct StubProvider;
886 impl SubscriptionProvider for StubProvider {
887 fn subscribe(
888 &self,
889 _uri: ResourceUri,
890 ) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>
891 {
892 Box::pin(async move {
893 let frame = serde_json::json!({
894 "mark_price": 50_000.5,
895 "best_bid_price": 50_000.0,
896 "best_ask_price": 50_001.0,
897 "last_price": 49_999.5,
898 "timestamp": 1_700_000_000_000_i64,
899 });
900 let stream = futures_util::stream::iter(vec![Ok::<_, AdapterError>(frame)]);
901 Ok(Box::pin(stream) as SubscriptionStream)
902 })
903 }
904 }
905
906 let registry = ResourceRegistry::build().with_subscription_provider(Arc::new(StubProvider));
907 let uri = ResourceUri::Ticker {
908 instrument: "BTC-PERPETUAL".to_string(),
909 };
910 let content = registry.read(&ctx(), &uri).await.expect("ok");
911 let ResourceContent::Json(value) = content;
912 assert!(value.get("delta").is_none());
915 assert!(value.get("gamma").is_none());
916 assert!(value.get("vega").is_none());
917 assert!(value.get("mark_iv").is_none());
918 }
919}