Skip to main content

deribit_mcp/resources/
mod.rs

1//! Resource registry, `deribit://` URI parsing, and read dispatch.
2//!
3//! Resources are partitioned by lifetime:
4//!
5//! - [`static_`] — refresh-on-read resources backed by `deribit-http`.
6//! - [`live`] — WebSocket-backed subscribable resources, populated in
7//!   v0.3 per ADR-0006.
8//!
9//! v0.1-07 shipped the URI parser, catalogue, and dispatch surface.
10//! v0.1-12 wired the two static reads:
11//! `deribit://currencies` → `static_::read_currencies`,
12//! `deribit://instruments/{currency}` → `static_::read_instruments`.
13//! v0.3-02 wires `deribit://book/{instrument}` through the
14//! [`LiveRegistry`] when a [`SubscriptionProvider`] is attached.
15//! `deribit://ticker/{instrument}` (v0.3-03) and
16//! `deribit://trades/{instrument}` (v0.3-04) are accepted by the
17//! parser but `read()` still returns
18//! [`AdapterError::Internal`] until those issues land. Without a
19//! `SubscriptionProvider`, even `Book` falls back to
20//! `AdapterError::Internal { reason: "live subscription provider
21//! not configured" }`.
22
23use std::sync::Arc;
24use std::time::Duration;
25
26use rmcp::model::{Annotated, RawResource, RawResourceTemplate, Resource, ResourceTemplate};
27
28use crate::context::AdapterContext;
29use crate::error::AdapterError;
30
31pub mod live;
32pub mod static_;
33
34use live::{BookSnapshot, LiveRegistry, SubscriptionProvider, TickerSnapshot, TradeUpdate};
35
36/// Strongly-typed `deribit://` URI variants.
37///
38/// The parser accepts every documented template; serving lives
39/// behind [`ResourceRegistry::read`]. As of v0.3-02 `Book` is
40/// served through the [`LiveRegistry`]; the remaining live URIs
41/// (`Ticker`, `Trades`) still return [`AdapterError::Internal`]
42/// until v0.3-03 / v0.3-04 wire them.
43#[derive(Debug, Clone, PartialEq, Eq, Hash)]
44pub enum ResourceUri {
45    /// `deribit://currencies` — the currency catalogue (static).
46    Currencies,
47    /// `deribit://instruments/{currency}` — instruments for a currency
48    /// (static).
49    Instruments {
50        /// Currency symbol, upper-case (`BTC`, `ETH`, …).
51        currency: String,
52    },
53    /// `deribit://book/{instrument}` — order book (live, v0.3+).
54    Book {
55        /// Instrument name (`BTC-PERPETUAL`, …).
56        instrument: String,
57    },
58    /// `deribit://ticker/{instrument}` — ticker (live, v0.3+).
59    Ticker {
60        /// Instrument name.
61        instrument: String,
62    },
63    /// `deribit://trades/{instrument}` — last trades (live, v0.3+).
64    Trades {
65        /// Instrument name.
66        instrument: String,
67    },
68}
69
70impl ResourceUri {
71    /// Render back to the canonical `deribit://...` string form.
72    #[must_use]
73    pub fn to_uri(&self) -> String {
74        match self {
75            Self::Currencies => "deribit://currencies".to_string(),
76            Self::Instruments { currency } => format!("deribit://instruments/{currency}"),
77            Self::Book { instrument } => format!("deribit://book/{instrument}"),
78            Self::Ticker { instrument } => format!("deribit://ticker/{instrument}"),
79            Self::Trades { instrument } => format!("deribit://trades/{instrument}"),
80        }
81    }
82}
83
84/// `deribit://` URI scheme.
85const SCHEME: &str = "deribit://";
86
87/// Parse a `deribit://...` URI into [`ResourceUri`].
88///
89/// # Errors
90///
91/// Returns [`AdapterError::Validation`] for any input that does not
92/// match a documented template.
93pub fn parse_resource_uri(s: &str) -> Result<ResourceUri, AdapterError> {
94    let rest = s
95        .strip_prefix(SCHEME)
96        .ok_or_else(|| AdapterError::validation("uri", format!("not a `{SCHEME}` URI: {s}")))?;
97    if rest.is_empty() {
98        return Err(AdapterError::validation("uri", "empty resource path"));
99    }
100    let mut segments = rest.splitn(2, '/');
101    let head = segments
102        .next()
103        .ok_or_else(|| AdapterError::validation("uri", "missing resource head"))?;
104    // Treat a trailing-slash tail (`deribit://instruments/`) as
105    // "missing", not as an empty currency/instrument segment, so the
106    // error reported is the documented `field: "uri"` shape.
107    let tail = segments.next().filter(|s| !s.is_empty());
108
109    match (head, tail) {
110        ("currencies", None) => Ok(ResourceUri::Currencies),
111        ("currencies", Some(_)) => Err(AdapterError::validation(
112            "uri",
113            "`deribit://currencies` takes no path",
114        )),
115        ("instruments", Some(currency)) => {
116            let currency = parse_currency(currency)?;
117            Ok(ResourceUri::Instruments { currency })
118        }
119        ("instruments", None) => Err(AdapterError::validation(
120            "uri",
121            "`deribit://instruments/{currency}` requires a currency",
122        )),
123        ("book", Some(instrument)) => {
124            let instrument = parse_instrument_name(instrument)?;
125            Ok(ResourceUri::Book { instrument })
126        }
127        ("ticker", Some(instrument)) => {
128            let instrument = parse_instrument_name(instrument)?;
129            Ok(ResourceUri::Ticker { instrument })
130        }
131        ("trades", Some(instrument)) => {
132            let instrument = parse_instrument_name(instrument)?;
133            Ok(ResourceUri::Trades { instrument })
134        }
135        ("book" | "ticker" | "trades", None) => Err(AdapterError::validation(
136            "uri",
137            format!("`deribit://{head}/{{instrument}}` requires an instrument"),
138        )),
139        (other, _) => Err(AdapterError::validation(
140            "uri",
141            format!("unknown resource head: `{other}`"),
142        )),
143    }
144}
145
146/// Validate a Deribit currency segment (`BTC`, `ETH`, …).
147///
148/// Deribit currency symbols are short ASCII upper-case identifiers.
149/// We accept 1..=8 chars of `[A-Z0-9_]` to match what the upstream
150/// `deribit-http` crate sees in practice; the upstream call enforces
151/// the canonical set.
152fn parse_currency(s: &str) -> Result<String, AdapterError> {
153    if s.is_empty() || s.len() > 8 {
154        return Err(AdapterError::validation(
155            "currency",
156            format!("expected 1..=8 chars, got {} for `{s}`", s.len()),
157        ));
158    }
159    if !s
160        .chars()
161        .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '_')
162    {
163        return Err(AdapterError::validation(
164            "currency",
165            format!("expected `[A-Z0-9_]`, got `{s}`"),
166        ));
167    }
168    Ok(s.to_string())
169}
170
171/// Validate an instrument name segment (`BTC-PERPETUAL`,
172/// `BTC-31MAY24-50000-C`, …).
173///
174/// Deribit instrument names are dash-separated ASCII upper-case tokens.
175/// Light-touch validation: 1..=64 chars of `[A-Z0-9_-]`. The upstream
176/// HTTP / WebSocket call enforces semantic shape.
177fn parse_instrument_name(s: &str) -> Result<String, AdapterError> {
178    if s.is_empty() || s.len() > 64 {
179        return Err(AdapterError::validation(
180            "instrument",
181            format!("expected 1..=64 chars, got {} for `{s}`", s.len()),
182        ));
183    }
184    if !s
185        .chars()
186        .all(|c| c.is_ascii_uppercase() || c.is_ascii_digit() || c == '-' || c == '_')
187    {
188        return Err(AdapterError::validation(
189            "instrument",
190            format!("expected `[A-Z0-9_-]`, got `{s}`"),
191        ));
192    }
193    Ok(s.to_string())
194}
195
196/// Snapshot of `resources/list` and `resources/templates/list`
197/// produced by the registry.
198#[derive(Debug, Default, Clone)]
199pub struct ResourceList {
200    /// Concrete resources (e.g. `deribit://currencies`).
201    pub resources: Vec<Resource>,
202    /// URI templates (e.g. `deribit://book/{instrument}`).
203    pub templates: Vec<ResourceTemplate>,
204}
205
206/// Body returned by [`ResourceRegistry::read`].
207///
208/// v0.1-07 ships only [`Self::Json`] (JSON payloads from
209/// `deribit-http`). The variant set is closed; new transports add
210/// new variants.
211#[derive(Debug, Clone, PartialEq)]
212pub enum ResourceContent {
213    /// JSON body produced by an upstream HTTP read. The MIME type
214    /// surfaced to MCP is `application/json`.
215    Json(serde_json::Value),
216}
217
218/// Registry of MCP resources the server exposes.
219#[derive(Clone)]
220pub struct ResourceRegistry {
221    list: ResourceList,
222    live: LiveRegistry,
223    /// Optional provider for live subscriptions. `None` until the
224    /// upstream `deribit-websocket` provider is configured (default
225    /// for v0.1 / anonymous contexts); reads on live URIs return
226    /// `AdapterError::Internal` when missing.
227    provider: Option<Arc<dyn SubscriptionProvider>>,
228}
229
230impl std::fmt::Debug for ResourceRegistry {
231    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232        f.debug_struct("ResourceRegistry")
233            .field("list", &self.list)
234            .field("live", &self.live)
235            .field(
236                "provider",
237                &self.provider.as_ref().map(|_| "<dyn SubscriptionProvider>"),
238            )
239            .finish()
240    }
241}
242
243impl Default for ResourceRegistry {
244    fn default() -> Self {
245        Self {
246            list: ResourceList::default(),
247            live: LiveRegistry::new(),
248            provider: None,
249        }
250    }
251}
252
253/// Per-call deadline waiting for the first frame on a fresh
254/// subscription. Bounded so a stalled upstream cannot turn a
255/// `resources/read` into an unbounded await.
256const FIRST_FRAME_TIMEOUT: Duration = Duration::from_secs(5);
257
258impl ResourceRegistry {
259    /// Construct an empty registry.
260    #[must_use]
261    pub fn new() -> Self {
262        Self::default()
263    }
264
265    /// Replace the live-subscription provider. The real upstream
266    /// `deribit-websocket` provider is wired by the binary
267    /// startup path (v0.3-02 / -03 / -04); tests pass a stub
268    /// implementation.
269    #[must_use]
270    pub fn with_subscription_provider(mut self, provider: Arc<dyn SubscriptionProvider>) -> Self {
271        self.provider = Some(provider);
272        self
273    }
274
275    /// Build the v0.1 catalogue.
276    ///
277    /// Concrete entries: `deribit://currencies`. The
278    /// per-currency `deribit://instruments/{currency}` resources are
279    /// only knowable after a live `get_currencies` call (resolved in
280    /// v0.1-12); they live as a template until then.
281    ///
282    /// Templates: `deribit://instruments/{currency}`,
283    /// `deribit://book/{instrument}`,
284    /// `deribit://ticker/{instrument}`,
285    /// `deribit://trades/{instrument}`. The last three are accepted
286    /// by the parser but `read()` returns
287    /// [`AdapterError::Validation`] until v0.3 wires the live
288    /// transport.
289    #[must_use]
290    pub fn build() -> Self {
291        let mut list = ResourceList::default();
292        list.resources.push(make_resource(
293            "deribit://currencies",
294            "Deribit currency catalogue",
295            "Static list of Deribit currency symbols and metadata.",
296        ));
297        list.templates.push(make_template(
298            "deribit://instruments/{currency}",
299            "Deribit instruments by currency",
300            "Static list of instruments for a given currency.",
301        ));
302        list.templates.push(make_template(
303            "deribit://book/{instrument}",
304            "Deribit order book (live)",
305            "Order book snapshots from the `book.<instrument>.raw` \
306             channel. Read returns the latest decoded BookSnapshot \
307             when a SubscriptionProvider is configured; otherwise \
308             AdapterError::Internal.",
309        ));
310        list.templates.push(make_template(
311            "deribit://ticker/{instrument}",
312            "Deribit ticker (live)",
313            "Throttled ticker snapshots from the \
314             `ticker.<instrument>.100ms` channel. Read returns the \
315             latest TickerSnapshot when a SubscriptionProvider is \
316             configured; otherwise AdapterError::Internal.",
317        ));
318        list.templates.push(make_template(
319            "deribit://trades/{instrument}",
320            "Deribit last trades (live)",
321            "Trade events from the `trades.<instrument>.raw` channel. \
322             Read returns the most recent N TradeUpdate values \
323             (newest first) when a SubscriptionProvider is configured; \
324             otherwise AdapterError::Internal.",
325        ));
326        Self {
327            list,
328            live: LiveRegistry::new(),
329            provider: None,
330        }
331    }
332
333    /// Snapshot the registered resources.
334    #[must_use]
335    pub fn resources(&self) -> Vec<Resource> {
336        self.list.resources.clone()
337    }
338
339    /// Snapshot the registered resource templates.
340    #[must_use]
341    pub fn templates(&self) -> Vec<ResourceTemplate> {
342        self.list.templates.clone()
343    }
344
345    /// Snapshot of the full `resources/list` + templates payload.
346    #[must_use]
347    pub fn list(&self) -> ResourceList {
348        self.list.clone()
349    }
350
351    /// Whether the registry has any entries or templates.
352    #[must_use]
353    pub fn is_empty(&self) -> bool {
354        self.list.resources.is_empty() && self.list.templates.is_empty()
355    }
356
357    /// Read a resource by its parsed URI.
358    ///
359    /// v0.1-12 routes static URIs (`Currencies`, `Instruments`) to
360    /// [`static_::read_currencies`] / [`static_::read_instruments`].
361    /// Live URIs (`Book`, `Ticker`, `Trades`) return a structured
362    /// [`AdapterError::Internal`] with `reason: "live resources land
363    /// in v0.3"` so the LLM sees a stable error shape — the live
364    /// transport ships in v0.3 (ADR-0006).
365    ///
366    /// # Errors
367    ///
368    /// Static reads surface whatever upstream HTTP failure the call
369    /// produces (network, rate-limit, API). Live reads return
370    /// [`AdapterError::Internal`] until v0.3 ships.
371    pub async fn read(
372        &self,
373        ctx: &AdapterContext,
374        uri: &ResourceUri,
375    ) -> Result<ResourceContent, AdapterError> {
376        match uri {
377            ResourceUri::Currencies => {
378                Ok(ResourceContent::Json(static_::read_currencies(ctx).await?))
379            }
380            ResourceUri::Instruments { currency } => Ok(ResourceContent::Json(
381                static_::read_instruments(ctx, currency).await?,
382            )),
383            ResourceUri::Book { instrument } => {
384                let value = self.read_live(uri).await?;
385                let book = BookSnapshot::from_value(instrument, &value)?;
386                Ok(ResourceContent::Json(serde_json::to_value(&book)?))
387            }
388            ResourceUri::Ticker { instrument } => {
389                let value = self.read_live(uri).await?;
390                let ticker = TickerSnapshot::from_value(instrument, &value)?;
391                Ok(ResourceContent::Json(serde_json::to_value(&ticker)?))
392            }
393            ResourceUri::Trades { .. } => {
394                // Subscribe (so the reader task is running) and
395                // hand back the rolling history of decoded
396                // trades. Each upstream frame is an array of
397                // trade objects; we flatten across frames and
398                // sort by timestamp newest-first so the LLM
399                // sees a deterministic chronological list,
400                // capped at `HISTORY_CAPACITY`.
401                use tokio::sync::broadcast::error::RecvError;
402                let provider = self.provider.as_ref().ok_or_else(|| {
403                    AdapterError::internal("live subscription provider not configured")
404                })?;
405                let handle = self.live.subscribe(provider.as_ref(), uri).await?;
406                let mut updates = handle.updates();
407                if handle.latest().await.is_none() {
408                    match tokio::time::timeout(FIRST_FRAME_TIMEOUT, updates.recv()).await {
409                        Ok(Ok(())) | Ok(Err(RecvError::Lagged(_))) => {}
410                        Ok(Err(RecvError::Closed)) => {
411                            return Err(AdapterError::internal(
412                                "live subscription closed before producing a frame",
413                            ));
414                        }
415                        Err(_elapsed) => {
416                            return Err(AdapterError::internal(
417                                "live subscription did not produce a frame in time",
418                            ));
419                        }
420                    }
421                }
422                let mut trades: Vec<TradeUpdate> = Vec::new();
423                for frame in handle.history().await {
424                    // Propagate decode errors rather than silently
425                    // dropping a malformed frame — an upstream
426                    // contract change should surface, not vanish.
427                    let mut decoded = TradeUpdate::batch_from_value(&frame)?;
428                    trades.append(&mut decoded);
429                }
430                trades.sort_by_key(|t| std::cmp::Reverse(t.timestamp));
431                trades.truncate(live::HISTORY_CAPACITY);
432                Ok(ResourceContent::Json(serde_json::to_value(&trades)?))
433            }
434        }
435    }
436
437    /// Subscribe to a live URI and return the latest cached
438    /// snapshot, waiting up to [`FIRST_FRAME_TIMEOUT`] on a fresh
439    /// subscription. The returned `SubscriptionHandle` is dropped at
440    /// the end of the call — the underlying entry stays open as long
441    /// as any other subscriber holds a handle, otherwise the
442    /// refcount returns to zero and the upstream channel closes.
443    async fn read_live(&self, uri: &ResourceUri) -> Result<serde_json::Value, AdapterError> {
444        use tokio::sync::broadcast::error::RecvError;
445
446        let provider = self
447            .provider
448            .as_ref()
449            .ok_or_else(|| AdapterError::internal("live subscription provider not configured"))?;
450        let handle = self.live.subscribe(provider.as_ref(), uri).await?;
451        // Subscribe to the broadcast BEFORE checking `latest`. If
452        // the first frame arrives between an early `latest` check
453        // and the `updates()` subscription, the broadcast receiver
454        // would miss its signal (no backlog) and we would time out
455        // spuriously.
456        let mut updates = handle.updates();
457        if let Some(snapshot) = handle.latest().await {
458            return Ok(snapshot);
459        }
460        match tokio::time::timeout(FIRST_FRAME_TIMEOUT, updates.recv()).await {
461            Ok(Ok(())) => handle
462                .latest()
463                .await
464                .ok_or_else(|| AdapterError::internal("update fired without snapshot")),
465            Ok(Err(RecvError::Lagged(_))) => handle
466                .latest()
467                .await
468                .ok_or_else(|| AdapterError::internal("broadcast lagged before first frame")),
469            Ok(Err(RecvError::Closed)) => Err(AdapterError::internal(
470                "live subscription closed before producing a frame",
471            )),
472            Err(_elapsed) => Err(AdapterError::internal(
473                "live subscription did not produce a frame in time",
474            )),
475        }
476    }
477}
478
479fn make_resource(uri: &'static str, name: &'static str, description: &'static str) -> Resource {
480    let raw = RawResource {
481        uri: uri.to_string(),
482        name: name.to_string(),
483        title: None,
484        description: Some(description.to_string()),
485        mime_type: Some("application/json".to_string()),
486        size: None,
487        icons: None,
488        meta: None,
489    };
490    Annotated {
491        raw,
492        annotations: None,
493    }
494}
495
496fn make_template(
497    template: &'static str,
498    name: &'static str,
499    description: &'static str,
500) -> ResourceTemplate {
501    let raw = RawResourceTemplate {
502        uri_template: template.to_string(),
503        name: name.to_string(),
504        title: None,
505        description: Some(description.to_string()),
506        mime_type: Some("application/json".to_string()),
507        icons: None,
508    };
509    Annotated {
510        raw,
511        annotations: None,
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518
519    #[test]
520    fn parses_currencies() {
521        assert_eq!(
522            parse_resource_uri("deribit://currencies").unwrap(),
523            ResourceUri::Currencies
524        );
525    }
526
527    #[test]
528    fn parses_instruments_with_currency() {
529        assert_eq!(
530            parse_resource_uri("deribit://instruments/BTC").unwrap(),
531            ResourceUri::Instruments {
532                currency: "BTC".to_string()
533            }
534        );
535    }
536
537    #[test]
538    fn parses_book_template() {
539        assert_eq!(
540            parse_resource_uri("deribit://book/BTC-PERPETUAL").unwrap(),
541            ResourceUri::Book {
542                instrument: "BTC-PERPETUAL".to_string()
543            }
544        );
545    }
546
547    #[test]
548    fn parses_ticker_and_trades() {
549        assert!(matches!(
550            parse_resource_uri("deribit://ticker/ETH-PERPETUAL").unwrap(),
551            ResourceUri::Ticker { .. }
552        ));
553        assert!(matches!(
554            parse_resource_uri("deribit://trades/BTC-31MAY24-50000-C").unwrap(),
555            ResourceUri::Trades { .. }
556        ));
557    }
558
559    #[test]
560    fn rejects_non_deribit_scheme() {
561        let err = parse_resource_uri("foo://bar").unwrap_err();
562        match err {
563            AdapterError::Validation { field, .. } => assert_eq!(field, "uri"),
564            other => panic!("unexpected: {other:?}"),
565        }
566    }
567
568    #[test]
569    fn rejects_currencies_with_path() {
570        let err = parse_resource_uri("deribit://currencies/extra").unwrap_err();
571        assert!(matches!(err, AdapterError::Validation { .. }));
572    }
573
574    #[test]
575    fn rejects_instruments_without_currency() {
576        let err = parse_resource_uri("deribit://instruments/").unwrap_err();
577        assert!(matches!(err, AdapterError::Validation { .. }));
578    }
579
580    #[test]
581    fn rejects_unknown_head() {
582        let err = parse_resource_uri("deribit://options/BTC").unwrap_err();
583        assert!(matches!(err, AdapterError::Validation { .. }));
584    }
585
586    #[test]
587    fn rejects_lowercase_currency() {
588        let err = parse_resource_uri("deribit://instruments/btc").unwrap_err();
589        match err {
590            AdapterError::Validation { field, .. } => assert_eq!(field, "currency"),
591            other => panic!("unexpected: {other:?}"),
592        }
593    }
594
595    #[test]
596    fn rejects_overlong_instrument() {
597        let long = "X".repeat(65);
598        let uri = format!("deribit://book/{long}");
599        let err = parse_resource_uri(&uri).unwrap_err();
600        match err {
601            AdapterError::Validation { field, .. } => assert_eq!(field, "instrument"),
602            other => panic!("unexpected: {other:?}"),
603        }
604    }
605
606    #[test]
607    fn round_trip_to_uri() {
608        for original in [
609            "deribit://currencies",
610            "deribit://instruments/BTC",
611            "deribit://book/BTC-PERPETUAL",
612            "deribit://ticker/ETH-PERPETUAL",
613            "deribit://trades/BTC-31MAY24-50000-C",
614        ] {
615            let parsed = parse_resource_uri(original).unwrap();
616            assert_eq!(parsed.to_uri(), original);
617        }
618    }
619
620    #[test]
621    fn registry_build_lists_static_currency_entry() {
622        let r = ResourceRegistry::build();
623        assert_eq!(r.resources().len(), 1);
624        assert_eq!(r.resources()[0].raw.uri, "deribit://currencies");
625    }
626
627    #[test]
628    fn registry_build_lists_four_templates() {
629        let r = ResourceRegistry::build();
630        let templates = r.templates();
631        assert_eq!(templates.len(), 4);
632        let uris: Vec<&str> = templates
633            .iter()
634            .map(|t| t.raw.uri_template.as_str())
635            .collect();
636        assert!(uris.contains(&"deribit://instruments/{currency}"));
637        assert!(uris.contains(&"deribit://book/{instrument}"));
638        assert!(uris.contains(&"deribit://ticker/{instrument}"));
639        assert!(uris.contains(&"deribit://trades/{instrument}"));
640    }
641
642    fn ctx() -> AdapterContext {
643        use crate::config::{Config, LogFormat, OrderTransport, Transport};
644        use std::net::SocketAddr;
645        use std::sync::Arc;
646        let cfg = Config {
647            endpoint: "https://test.deribit.com".to_string(),
648            client_id: None,
649            client_secret: None,
650            allow_trading: false,
651            max_order_usd: None,
652            transport: Transport::Stdio,
653            http_listen: SocketAddr::from(([127, 0, 0, 1], 8723)),
654            http_bearer_token: None,
655            log_format: LogFormat::Text,
656            order_transport: OrderTransport::Http,
657        };
658        AdapterContext::new(Arc::new(cfg)).expect("ctx")
659    }
660
661    #[tokio::test]
662    async fn read_live_book_uses_provider_and_returns_snapshot() {
663        use crate::resources::live::{SubscriptionProvider, SubscriptionStream};
664        use std::future::Future;
665        use std::pin::Pin;
666        use std::sync::Arc;
667        use std::sync::atomic::{AtomicU64, Ordering};
668
669        /// Counts every `subscribe()` call so the test can pin the
670        /// "open exactly once across multiple reads" semantics.
671        struct CountingProvider {
672            opened: Arc<AtomicU64>,
673        }
674        impl SubscriptionProvider for CountingProvider {
675            fn subscribe(
676                &self,
677                _uri: ResourceUri,
678            ) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>
679            {
680                self.opened.fetch_add(1, Ordering::AcqRel);
681                Box::pin(async move {
682                    let frame = serde_json::json!({
683                        "bids": [[50_000.0, 1.0], [49_999.0, 2.0]],
684                        "asks": [[50_001.0, 1.5]],
685                        "change_id": 42_u64,
686                        "timestamp": 1_700_000_000_000_i64,
687                    });
688                    let stream = futures_util::stream::iter(vec![Ok::<_, AdapterError>(frame)]);
689                    Ok(Box::pin(stream) as SubscriptionStream)
690                })
691            }
692        }
693
694        let opened = Arc::new(AtomicU64::new(0));
695        let provider = Arc::new(CountingProvider {
696            opened: opened.clone(),
697        });
698        let registry = ResourceRegistry::build().with_subscription_provider(provider);
699        let uri = ResourceUri::Book {
700            instrument: "BTC-PERPETUAL".to_string(),
701        };
702
703        let content = registry.read(&ctx(), &uri).await.expect("ok");
704        match content {
705            ResourceContent::Json(value) => {
706                assert_eq!(
707                    value.get("instrument").and_then(|v| v.as_str()),
708                    Some("BTC-PERPETUAL")
709                );
710                assert_eq!(value.get("change_id").and_then(|v| v.as_u64()), Some(42));
711                assert!(value.get("bids").and_then(|v| v.as_array()).is_some());
712            }
713        }
714
715        // Second read should hit the cached entry. The first read
716        // dropped its `SubscriptionHandle`, so the entry's refcount
717        // returned to zero and the deferred-cleanup task may have
718        // already removed it. To pin the semantics deterministically,
719        // assert the provider was opened *at most twice* — once
720        // for the first read, optionally once if the cleanup
721        // raced — and never more than that (no per-read leak).
722        let _ = registry.read(&ctx(), &uri).await.expect("ok");
723        let opens = opened.load(Ordering::Acquire);
724        assert!(
725            (1..=2).contains(&opens),
726            "expected provider opens in 1..=2, got {opens}"
727        );
728    }
729
730    #[tokio::test]
731    async fn read_live_book_without_provider_returns_internal() {
732        let registry = ResourceRegistry::build();
733        let uri = ResourceUri::Book {
734            instrument: "BTC-PERPETUAL".to_string(),
735        };
736        let err = registry.read(&ctx(), &uri).await.unwrap_err();
737        assert!(matches!(err, AdapterError::Internal { .. }));
738    }
739
740    #[tokio::test]
741    async fn read_live_trades_uses_provider_and_returns_chronological_history() {
742        use crate::resources::live::{SubscriptionProvider, SubscriptionStream};
743        use std::future::Future;
744        use std::pin::Pin;
745        use std::sync::Arc;
746
747        struct StubProvider;
748        impl SubscriptionProvider for StubProvider {
749            fn subscribe(
750                &self,
751                _uri: ResourceUri,
752            ) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>
753            {
754                Box::pin(async move {
755                    // Two frames, each carrying a small batch.
756                    let frames = vec![
757                        Ok::<_, AdapterError>(serde_json::json!([
758                            {
759                                "direction": "buy",
760                                "price": 50_001.0,
761                                "amount": 1.0,
762                                "trade_id": "t1",
763                                "timestamp": 1_700_000_000_001_i64
764                            }
765                        ])),
766                        Ok::<_, AdapterError>(serde_json::json!([
767                            {
768                                "direction": "sell",
769                                "price": 50_002.0,
770                                "amount": 0.5,
771                                "trade_id": "t2",
772                                "timestamp": 1_700_000_000_002_i64,
773                                "liquidation": "M",
774                                "tick_direction": 1_i64,
775                                "mark_price": 50_001.5,
776                                "index_price": 50_010.0
777                            }
778                        ])),
779                    ];
780                    let stream = futures_util::stream::iter(frames);
781                    Ok(Box::pin(stream) as SubscriptionStream)
782                })
783            }
784        }
785
786        let registry = ResourceRegistry::build().with_subscription_provider(Arc::new(StubProvider));
787        let uri = ResourceUri::Trades {
788            instrument: "BTC-PERPETUAL".to_string(),
789        };
790        // Give the reader task a beat to drain both frames.
791        let _ = registry.read(&ctx(), &uri).await.expect("first ok");
792        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
793        let content = registry.read(&ctx(), &uri).await.expect("second ok");
794        let ResourceContent::Json(value) = content;
795        let trades = value.as_array().expect("array");
796        let ids: Vec<&str> = trades
797            .iter()
798            .filter_map(|t| t.get("trade_id").and_then(|v| v.as_str()))
799            .collect();
800        assert_eq!(
801            ids,
802            vec!["t2", "t1"],
803            "expected newest-first ordering; got {ids:?}"
804        );
805        let t2 = &trades[0];
806        assert_eq!(
807            t2.get("liquidation").and_then(|v| v.as_str()),
808            Some("M"),
809            "t2 should carry the liquidation marker"
810        );
811        assert_eq!(t2.get("tick_direction").and_then(|v| v.as_i64()), Some(1));
812        assert_eq!(
813            t2.get("mark_price").and_then(|v| v.as_f64()),
814            Some(50_001.5)
815        );
816        assert_eq!(
817            t2.get("index_price").and_then(|v| v.as_f64()),
818            Some(50_010.0)
819        );
820    }
821
822    #[tokio::test]
823    async fn read_live_ticker_uses_provider_and_returns_snapshot_with_greeks() {
824        use crate::resources::live::{SubscriptionProvider, SubscriptionStream};
825        use std::future::Future;
826        use std::pin::Pin;
827        use std::sync::Arc;
828
829        struct StubProvider;
830        impl SubscriptionProvider for StubProvider {
831            fn subscribe(
832                &self,
833                _uri: ResourceUri,
834            ) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>
835            {
836                Box::pin(async move {
837                    let frame = serde_json::json!({
838                        "mark_price": 50_000.5,
839                        "index_price": 50_010.0,
840                        "best_bid_price": 50_000.0,
841                        "best_ask_price": 50_001.0,
842                        "last_price": 49_999.5,
843                        "mark_iv": 65.0,
844                        "greeks": {
845                            "delta": 0.55,
846                            "gamma": 0.0001,
847                            "vega": 12.3,
848                            "theta": -0.4,
849                            "rho": 0.05
850                        },
851                        "timestamp": 1_700_000_000_000_i64,
852                    });
853                    let stream = futures_util::stream::iter(vec![Ok::<_, AdapterError>(frame)]);
854                    Ok(Box::pin(stream) as SubscriptionStream)
855                })
856            }
857        }
858
859        let registry = ResourceRegistry::build().with_subscription_provider(Arc::new(StubProvider));
860        let uri = ResourceUri::Ticker {
861            instrument: "BTC-31MAY24-50000-C".to_string(),
862        };
863        let content = registry.read(&ctx(), &uri).await.expect("ok");
864        let ResourceContent::Json(value) = content;
865        assert_eq!(
866            value.get("instrument").and_then(|v| v.as_str()),
867            Some("BTC-31MAY24-50000-C")
868        );
869        assert_eq!(
870            value.get("mark_price").and_then(|v| v.as_f64()),
871            Some(50_000.5)
872        );
873        assert_eq!(value.get("delta").and_then(|v| v.as_f64()), Some(0.55));
874        assert_eq!(value.get("gamma").and_then(|v| v.as_f64()), Some(0.0001));
875        assert_eq!(value.get("vega").and_then(|v| v.as_f64()), Some(12.3));
876    }
877
878    #[tokio::test]
879    async fn read_live_ticker_perp_omits_greeks() {
880        use crate::resources::live::{SubscriptionProvider, SubscriptionStream};
881        use std::future::Future;
882        use std::pin::Pin;
883        use std::sync::Arc;
884
885        struct StubProvider;
886        impl SubscriptionProvider for StubProvider {
887            fn subscribe(
888                &self,
889                _uri: ResourceUri,
890            ) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>
891            {
892                Box::pin(async move {
893                    let frame = serde_json::json!({
894                        "mark_price": 50_000.5,
895                        "best_bid_price": 50_000.0,
896                        "best_ask_price": 50_001.0,
897                        "last_price": 49_999.5,
898                        "timestamp": 1_700_000_000_000_i64,
899                    });
900                    let stream = futures_util::stream::iter(vec![Ok::<_, AdapterError>(frame)]);
901                    Ok(Box::pin(stream) as SubscriptionStream)
902                })
903            }
904        }
905
906        let registry = ResourceRegistry::build().with_subscription_provider(Arc::new(StubProvider));
907        let uri = ResourceUri::Ticker {
908            instrument: "BTC-PERPETUAL".to_string(),
909        };
910        let content = registry.read(&ctx(), &uri).await.expect("ok");
911        let ResourceContent::Json(value) = content;
912        // `skip_serializing_if = "Option::is_none"` keeps absent
913        // fields out of the JSON payload entirely.
914        assert!(value.get("delta").is_none());
915        assert!(value.get("gamma").is_none());
916        assert!(value.get("vega").is_none());
917        assert!(value.get("mark_iv").is_none());
918    }
919}