use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use futures_core::Stream;
use serde_json::Value;
use tokio::sync::{Mutex, RwLock, broadcast};
use tokio_util::sync::CancellationToken;
use super::ResourceUri;
use crate::error::AdapterError;
pub type SubscriptionStream =
Pin<Box<dyn Stream<Item = Result<Value, AdapterError>> + Send + 'static>>;
pub trait SubscriptionProvider: Send + Sync + 'static {
fn subscribe(
&self,
uri: ResourceUri,
) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>;
}
pub struct SubscriptionHandle {
uri: ResourceUri,
entry: Arc<SubscriptionEntry>,
registry: Arc<LiveRegistryShared>,
}
impl SubscriptionHandle {
#[must_use]
pub fn uri(&self) -> &ResourceUri {
&self.uri
}
#[must_use]
pub fn updates(&self) -> broadcast::Receiver<()> {
self.entry.broadcast.subscribe()
}
pub async fn latest(&self) -> Option<Value> {
self.entry.latest.lock().await.clone()
}
pub async fn history(&self) -> Vec<Value> {
self.entry.history.lock().await.iter().cloned().collect()
}
}
impl Drop for SubscriptionHandle {
fn drop(&mut self) {
let prev = self.entry.refcount.fetch_sub(1, Ordering::AcqRel);
assert!(
prev >= 1,
"live subscription refcount underflow on drop (uri = {:?})",
self.uri
);
if prev == 1 {
let registry = self.registry.clone();
let uri = self.uri.clone();
tokio::spawn(async move {
let mut map = registry.entries.write().await;
let still_zero = map
.get(&uri)
.is_some_and(|e| e.refcount.load(Ordering::Acquire) == 0);
if still_zero {
if let Some(entry) = map.remove(&uri) {
entry.cancel.cancel();
}
}
});
}
}
}
#[derive(Debug)]
pub struct SubscriptionEntry {
pub channel: String,
pub refcount: AtomicU64,
pub latest: Mutex<Option<Value>>,
pub history: Mutex<VecDeque<Value>>,
pub broadcast: broadcast::Sender<()>,
pub cancel: CancellationToken,
last_notified: Mutex<Option<Instant>>,
}
pub trait NotificationSink: Send + Sync + 'static {
fn notify(&self, uri: &ResourceUri);
}
pub const DEFAULT_NOTIFY_INTERVAL: Duration = Duration::from_millis(100);
pub const HISTORY_CAPACITY: usize = 32;
struct LiveRegistryShared {
entries: RwLock<HashMap<ResourceUri, Arc<SubscriptionEntry>>>,
sink: RwLock<Option<Arc<dyn NotificationSink>>>,
notify_interval: RwLock<Duration>,
}
impl Default for LiveRegistryShared {
fn default() -> Self {
Self {
entries: RwLock::default(),
sink: RwLock::new(None),
notify_interval: RwLock::new(DEFAULT_NOTIFY_INTERVAL),
}
}
}
impl std::fmt::Debug for LiveRegistryShared {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LiveRegistryShared")
.field("entries", &"<HashMap>")
.field("sink", &"<Option<dyn NotificationSink>>")
.field("notify_interval", &"<RwLock<Duration>>")
.finish()
}
}
#[derive(Debug, Default, Clone)]
pub struct LiveRegistry {
inner: Arc<LiveRegistryShared>,
}
impl LiveRegistry {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn set_notification_sink(&self, sink: Option<Arc<dyn NotificationSink>>) {
*self.inner.sink.write().await = sink;
}
pub async fn set_notify_interval(&self, interval: Duration) {
*self.inner.notify_interval.write().await = interval;
}
pub async fn notify_interval(&self) -> Duration {
*self.inner.notify_interval.read().await
}
pub async fn len(&self) -> usize {
self.inner.entries.read().await.len()
}
pub async fn is_empty(&self) -> bool {
self.inner.entries.read().await.is_empty()
}
pub async fn refcount(&self, uri: &ResourceUri) -> u64 {
match self.inner.entries.read().await.get(uri) {
Some(entry) => entry.refcount.load(Ordering::Acquire),
None => 0,
}
}
pub async fn subscribe<P: SubscriptionProvider + ?Sized>(
&self,
provider: &P,
uri: &ResourceUri,
) -> Result<SubscriptionHandle, AdapterError> {
if let Some(entry) = self.inner.entries.read().await.get(uri).cloned() {
if !entry.cancel.is_cancelled() {
return Ok(self.attach(uri.clone(), entry));
}
}
let mut stream = provider.subscribe(uri.clone()).await?;
let cancel = CancellationToken::new();
let (broadcast_tx, _) = broadcast::channel::<()>(BROADCAST_CAPACITY);
let entry = Arc::new(SubscriptionEntry {
channel: channel_name_for(uri),
refcount: AtomicU64::new(0),
latest: Mutex::new(None),
history: Mutex::new(VecDeque::with_capacity(HISTORY_CAPACITY)),
broadcast: broadcast_tx.clone(),
cancel: cancel.clone(),
last_notified: Mutex::new(None),
});
{
let mut map = self.inner.entries.write().await;
if let Some(existing) = map.get(uri).cloned() {
if !existing.cancel.is_cancelled() {
cancel.cancel();
drop(stream);
return Ok(self.attach(uri.clone(), existing));
}
map.remove(uri);
}
map.insert(uri.clone(), entry.clone());
}
let task_entry = entry.clone();
let task_uri = uri.clone();
let task_shared = self.inner.clone();
tokio::spawn(async move {
use futures_util::StreamExt;
loop {
tokio::select! {
biased;
_ = task_entry.cancel.cancelled() => break,
item = stream.next() => match item {
Some(Ok(value)) => {
{
let mut history = task_entry.history.lock().await;
if history.len() == HISTORY_CAPACITY {
history.pop_back();
}
history.push_front(value.clone());
}
*task_entry.latest.lock().await = Some(value);
let _ = task_entry.broadcast.send(());
maybe_notify(&task_entry, &task_uri, &task_shared).await;
}
Some(Err(err)) => {
tracing::warn!(error = %err, "live subscription stream error; closing");
break;
}
None => break,
},
}
}
});
Ok(self.attach(uri.clone(), entry))
}
fn attach(&self, uri: ResourceUri, entry: Arc<SubscriptionEntry>) -> SubscriptionHandle {
let prev = entry.refcount.fetch_add(1, Ordering::AcqRel);
assert!(
prev.checked_add(1).is_some(),
"live subscription refcount overflowed u64"
);
SubscriptionHandle {
uri,
entry,
registry: self.inner.clone(),
}
}
}
async fn maybe_notify(entry: &SubscriptionEntry, uri: &ResourceUri, shared: &LiveRegistryShared) {
let sink = {
let guard = shared.sink.read().await;
match guard.as_ref() {
Some(sink) => sink.clone(),
None => return,
}
};
let interval = *shared.notify_interval.read().await;
let mut last = entry.last_notified.lock().await;
let now = Instant::now();
let should_emit = match *last {
_ if interval.is_zero() => true,
Some(prev) => now.saturating_duration_since(prev) >= interval,
None => true,
};
if !should_emit {
return;
}
*last = Some(now);
drop(last);
sink.notify(uri);
}
pub fn channel_name_for(uri: &ResourceUri) -> String {
match uri {
ResourceUri::Book { instrument } => format!("book.{instrument}.raw"),
ResourceUri::Ticker { instrument } => format!("ticker.{instrument}.100ms"),
ResourceUri::Trades { instrument } => format!("trades.{instrument}.raw"),
ResourceUri::Currencies => "currencies".to_string(),
ResourceUri::Instruments { currency } => format!("instruments.{currency}"),
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct BookSnapshot {
pub instrument: String,
pub bids: Vec<(f64, f64)>,
pub asks: Vec<(f64, f64)>,
pub change_id: u64,
pub timestamp: i64,
}
impl BookSnapshot {
pub fn from_value(instrument: &str, value: &Value) -> Result<Self, AdapterError> {
let obj = value
.as_object()
.ok_or_else(|| AdapterError::validation("book", "expected JSON object"))?;
let bids = decode_levels(obj.get("bids"))?;
let asks = decode_levels(obj.get("asks"))?;
let change_id = obj
.get("change_id")
.and_then(Value::as_u64)
.unwrap_or_default();
let timestamp = obj
.get("timestamp")
.and_then(Value::as_i64)
.unwrap_or_default();
Ok(Self {
instrument: instrument.to_string(),
bids,
asks,
change_id,
timestamp,
})
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct TickerSnapshot {
pub instrument: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mark_price: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub index_price: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub best_bid_price: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub best_ask_price: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_price: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mark_iv: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub delta: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub gamma: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub vega: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timestamp: Option<i64>,
}
impl TickerSnapshot {
pub fn from_value(instrument: &str, value: &Value) -> Result<Self, AdapterError> {
let obj = value
.as_object()
.ok_or_else(|| AdapterError::validation("ticker", "expected JSON object"))?;
let f64_at = |key: &str| obj.get(key).and_then(Value::as_f64);
let timestamp = obj.get("timestamp").and_then(Value::as_i64);
let greeks = obj.get("greeks").and_then(Value::as_object);
let greek = |key: &str| greeks.and_then(|g| g.get(key)).and_then(Value::as_f64);
Ok(Self {
instrument: instrument.to_string(),
mark_price: f64_at("mark_price"),
index_price: f64_at("index_price"),
best_bid_price: f64_at("best_bid_price"),
best_ask_price: f64_at("best_ask_price"),
last_price: f64_at("last_price"),
mark_iv: f64_at("mark_iv"),
delta: greek("delta"),
gamma: greek("gamma"),
vega: greek("vega"),
timestamp,
})
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct TradeUpdate {
pub direction: String,
pub price: f64,
pub amount: f64,
pub trade_id: String,
pub timestamp: i64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub liquidation: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tick_direction: Option<i64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mark_price: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub index_price: Option<f64>,
}
impl TradeUpdate {
fn from_value(value: &Value) -> Option<Self> {
let obj = value.as_object()?;
let direction = obj.get("direction").and_then(Value::as_str)?.to_string();
let price = obj.get("price").and_then(Value::as_f64)?;
let amount = obj.get("amount").and_then(Value::as_f64)?;
let trade_id = obj.get("trade_id").and_then(|v| {
v.as_str()
.map(str::to_string)
.or_else(|| v.as_u64().map(|n| n.to_string()))
})?;
let timestamp = obj.get("timestamp").and_then(Value::as_i64)?;
Some(Self {
direction,
price,
amount,
trade_id,
timestamp,
liquidation: obj
.get("liquidation")
.and_then(Value::as_str)
.map(str::to_string),
tick_direction: obj.get("tick_direction").and_then(Value::as_i64),
mark_price: obj.get("mark_price").and_then(Value::as_f64),
index_price: obj.get("index_price").and_then(Value::as_f64),
})
}
pub fn batch_from_value(value: &Value) -> Result<Vec<Self>, AdapterError> {
let array = value
.as_array()
.ok_or_else(|| AdapterError::validation("trades", "expected JSON array"))?;
Ok(array.iter().filter_map(Self::from_value).collect())
}
}
fn decode_levels(value: Option<&Value>) -> Result<Vec<(f64, f64)>, AdapterError> {
let Some(array) = value.and_then(Value::as_array) else {
return Ok(Vec::new());
};
let mut out = Vec::with_capacity(array.len());
for level in array {
let Some(items) = level.as_array() else {
continue;
};
let (price, size) = match items.as_slice() {
[_op, price, size] => (price.as_f64(), size.as_f64()),
[price, size] => (price.as_f64(), size.as_f64()),
_ => continue,
};
if let (Some(price), Some(size)) = (price, size) {
out.push((price, size));
}
}
Ok(out)
}
const BROADCAST_CAPACITY: usize = 64;
#[cfg(test)]
mod tests {
use super::*;
use futures_util::stream;
use std::sync::Mutex as StdMutex;
use std::time::Duration;
#[derive(Default)]
struct StubProvider {
items: StdMutex<HashMap<ResourceUri, Vec<Value>>>,
opened: AtomicU64,
}
impl StubProvider {
fn with(uri: ResourceUri, items: Vec<Value>) -> Self {
let p = StubProvider::default();
p.items.lock().unwrap().insert(uri, items);
p
}
}
impl SubscriptionProvider for StubProvider {
fn subscribe(
&self,
uri: ResourceUri,
) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>
{
self.opened.fetch_add(1, Ordering::AcqRel);
let items = self
.items
.lock()
.unwrap()
.get(&uri)
.cloned()
.unwrap_or_default();
Box::pin(async move {
let s = stream::iter(items.into_iter().map(Ok::<_, AdapterError>));
Ok(Box::pin(s) as SubscriptionStream)
})
}
}
fn book_btc() -> ResourceUri {
ResourceUri::Book {
instrument: "BTC-PERPETUAL".to_string(),
}
}
#[tokio::test]
async fn first_subscribe_opens_upstream() {
let provider = StubProvider::with(book_btc(), vec![serde_json::json!({"snap": 1})]);
let registry = LiveRegistry::new();
let _handle = registry
.subscribe(&provider, &book_btc())
.await
.expect("subscribe");
assert_eq!(registry.refcount(&book_btc()).await, 1);
assert_eq!(provider.opened.load(Ordering::Acquire), 1);
}
#[tokio::test]
async fn second_subscribe_reuses_entry() {
let provider = StubProvider::with(book_btc(), vec![]);
let registry = LiveRegistry::new();
let _h1 = registry.subscribe(&provider, &book_btc()).await.unwrap();
let _h2 = registry.subscribe(&provider, &book_btc()).await.unwrap();
assert_eq!(registry.refcount(&book_btc()).await, 2);
assert_eq!(
provider.opened.load(Ordering::Acquire),
1,
"upstream should have been opened exactly once"
);
}
#[tokio::test]
async fn dropping_last_handle_closes_upstream_and_removes_entry() {
let provider = StubProvider::with(book_btc(), vec![]);
let registry = LiveRegistry::new();
let h1 = registry.subscribe(&provider, &book_btc()).await.unwrap();
let h2 = registry.subscribe(&provider, &book_btc()).await.unwrap();
assert_eq!(registry.refcount(&book_btc()).await, 2);
drop(h2);
assert_eq!(registry.refcount(&book_btc()).await, 1);
drop(h1);
for _ in 0..20 {
if registry.is_empty().await {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
assert!(registry.is_empty().await);
assert_eq!(registry.refcount(&book_btc()).await, 0);
}
#[tokio::test]
async fn updates_broadcast_fires_and_latest_carries_payload() {
struct GatedProvider {
release: Arc<tokio::sync::Notify>,
}
impl SubscriptionProvider for GatedProvider {
fn subscribe(
&self,
_uri: ResourceUri,
) -> Pin<Box<dyn Future<Output = Result<SubscriptionStream, AdapterError>> + Send + '_>>
{
let release = self.release.clone();
Box::pin(async move {
let s = async_stream::stream! {
release.notified().await;
yield Ok::<_, AdapterError>(serde_json::json!({"v": 1}));
yield Ok::<_, AdapterError>(serde_json::json!({"v": 2}));
};
Ok(Box::pin(s) as SubscriptionStream)
})
}
}
let release = Arc::new(tokio::sync::Notify::new());
let provider = GatedProvider {
release: release.clone(),
};
let registry = LiveRegistry::new();
let handle = registry.subscribe(&provider, &book_btc()).await.unwrap();
let mut updates = handle.updates();
release.notify_one();
for _ in 0..50 {
if handle.latest().await.is_some() {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
let latest = handle.latest().await.expect("latest set");
assert!(latest.get("v").is_some());
let signal = tokio::time::timeout(Duration::from_millis(500), updates.recv()).await;
assert!(signal.is_ok(), "expected at least one update signal");
}
#[derive(Default, Clone)]
struct CountingSink {
calls: Arc<std::sync::Mutex<Vec<ResourceUri>>>,
}
impl NotificationSink for CountingSink {
fn notify(&self, uri: &ResourceUri) {
self.calls.lock().unwrap().push(uri.clone());
}
}
#[tokio::test]
async fn notification_sink_fires_after_first_frame() {
let provider = StubProvider::with(book_btc(), vec![serde_json::json!({"v": 1})]);
let registry = LiveRegistry::new();
let sink = CountingSink::default();
registry
.set_notification_sink(Some(Arc::new(sink.clone())))
.await;
registry.set_notify_interval(Duration::ZERO).await;
let _handle = registry.subscribe(&provider, &book_btc()).await.unwrap();
for _ in 0..50 {
if !sink.calls.lock().unwrap().is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
assert!(
!sink.calls.lock().unwrap().is_empty(),
"expected at least one notification"
);
}
#[tokio::test]
async fn notification_throttle_does_not_advance_without_sink() {
let provider = StubProvider::with(book_btc(), vec![serde_json::json!({"v": 1})]);
let registry = LiveRegistry::new();
registry.set_notify_interval(Duration::from_secs(10)).await;
let handle = registry.subscribe(&provider, &book_btc()).await.unwrap();
for _ in 0..50 {
if handle.latest().await.is_some() {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
let sink = CountingSink::default();
registry
.set_notification_sink(Some(Arc::new(sink.clone())))
.await;
drop(handle);
for _ in 0..20 {
if registry.is_empty().await {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let provider2 = StubProvider::with(book_btc(), vec![serde_json::json!({"v": 2})]);
let _h2 = registry.subscribe(&provider2, &book_btc()).await.unwrap();
for _ in 0..50 {
if !sink.calls.lock().unwrap().is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
assert!(
!sink.calls.lock().unwrap().is_empty(),
"sink should fire on the first frame after attach, regardless of \
frames received while detached"
);
}
#[tokio::test]
async fn notification_throttle_coalesces_burst_into_single_emit() {
let frames: Vec<Value> = (0..100).map(|i| serde_json::json!({"v": i})).collect();
let provider = StubProvider::with(book_btc(), frames);
let registry = LiveRegistry::new();
let sink = CountingSink::default();
registry
.set_notification_sink(Some(Arc::new(sink.clone())))
.await;
registry.set_notify_interval(Duration::from_secs(1)).await;
let _handle = registry.subscribe(&provider, &book_btc()).await.unwrap();
for _ in 0..50 {
if !sink.calls.lock().unwrap().is_empty() {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
let n = sink.calls.lock().unwrap().len();
assert_eq!(
n, 1,
"throttle must coalesce 100-frame burst into a single notification (got {n})"
);
}
#[test]
fn channel_names_match_deribit_taxonomy() {
assert_eq!(
channel_name_for(&ResourceUri::Book {
instrument: "BTC-PERPETUAL".to_string()
}),
"book.BTC-PERPETUAL.raw"
);
assert_eq!(
channel_name_for(&ResourceUri::Ticker {
instrument: "ETH-PERPETUAL".to_string()
}),
"ticker.ETH-PERPETUAL.100ms"
);
assert_eq!(
channel_name_for(&ResourceUri::Trades {
instrument: "BTC-31MAY24-50000-C".to_string()
}),
"trades.BTC-31MAY24-50000-C.raw"
);
}
}