tradingkit 0.1.0

Exchange-agnostic trading library for equities and crypto
Documentation
//! Data access abstractions that separate direct exchange reads from cached reads.
//!
//! # Design
//!
//! - `Source<Q, T>` fetches authoritative data from an exchange or upstream service.
//! - `Store<Q, T>` persists previously fetched data in memory or another sink.
//! - `Repository<S, C>` composes a source and a store into a cache-first read facade.
//!
//! This layer is intentionally read-focused. Command-style operations such as order submission
//! remain in `trading::TradingApi` and should invalidate or bypass caches at the call site.
//!
//! # Examples
//!
//! Direct read from the exchange adapter via `DataApi`:
//!
//! ```rust,no_run
//! use tradingkit::data::DataApi;
//! use tradingkit::exchange::alpaca::{Alpaca, AlpacaCredentials};
//!
//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
//!     let alpaca = Alpaca::paper(AlpacaCredentials::new(
//!         "your-key".to_string(),
//!         "your-secret".to_string(),
//!     ));
//!     let account = alpaca.get_account().await?;
//!     let _status = account.status;
//!     Ok(())
//! }
//! ```

use std::collections::HashMap;
use std::error::Error;
use std::hash::Hash;
use std::sync::Mutex;

use crate::model::{ListOrdersRequest, OptionChainSnapshot, Order, TradingAccount};

/// Exchange data query surface covering account snapshots, order lookups, and market data.
#[allow(async_fn_in_trait)]
pub trait DataApi {
    /// Reads the current trading account snapshot.
    async fn get_account(&self) -> Result<TradingAccount, Box<dyn Error>>;

    /// Reads one order by exchange order id.
    async fn get_order(&self, order_id: &str) -> Result<Order, Box<dyn Error>>;

    /// Reads one order by client order id.
    async fn get_order_by_client_id(&self, client_order_id: &str) -> Result<Order, Box<dyn Error>>;

    /// Lists orders using the provided filters.
    async fn list_orders(&self, request: &ListOrdersRequest) -> Result<Vec<Order>, Box<dyn Error>>;

    /// Reads the current option chain snapshots for the underlying symbol.
    async fn get_option_chain(
        &self,
        underlying_symbol: &str,
    ) -> Result<Vec<OptionChainSnapshot>, Box<dyn Error>>;
}

/// Key type used when reading the current trading account snapshot.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AccountQuery;

/// Key type used when reading one order by exchange order id.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OrderByIdQuery {
    pub order_id: String,
}

/// Key type used when reading one order by client order id.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OrderByClientIdQuery {
    pub client_order_id: String,
}

/// Key type used when reading the current option chain for an underlying symbol.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct OptionChainQuery {
    pub underlying_symbol: String,
}

/// Authoritative read source such as an exchange REST adapter.
#[allow(async_fn_in_trait)]
pub trait Source<Q, T> {
    /// Fetches fresh data for the given query.
    async fn fetch(&self, query: &Q) -> Result<T, Box<dyn Error>>;
}

/// Cache or sink abstraction for persisted query results.
#[allow(async_fn_in_trait)]
pub trait Store<Q, T> {
    /// Returns a cached value when it exists.
    async fn get(&self, query: &Q) -> Result<Option<T>, Box<dyn Error>>;

    /// Persists a value for future reads.
    async fn put(&self, query: &Q, value: &T) -> Result<(), Box<dyn Error>>;
}

/// Cache-first facade that composes a source with a store.
pub struct Repository<S, C> {
    source: S,
    store: C,
}

impl<S, C> Repository<S, C> {
    /// Creates a new repository from a source and a store.
    ///
    /// # Examples
    ///
    /// ```rust
    /// use tradingkit::data::{InMemoryStore, Repository};
    ///
    /// let source = InMemoryStore::<String, String>::new();
    /// let store = InMemoryStore::<String, String>::new();
    /// let _ = Repository::new(source, store);
    /// ```
    pub fn new(source: S, store: C) -> Self {
        Repository { source, store }
    }
}

impl<S, C> Repository<S, C> {
    /// Tries the store first, then falls back to the source with write-through persistence.
    ///
    /// # Examples
    ///
    /// ```rust,no_run
    /// use tradingkit::data::{InMemoryStore, Repository, Source};
    ///
    /// // Source would be a ClickHouse adapter or similar persistent layer.
    /// // Repository composes any Source with a Store for cache-first reads.
    /// ```
    pub async fn get<Q, T>(&self, query: &Q) -> Result<T, Box<dyn Error>>
    where
        S: Source<Q, T>,
        C: Store<Q, T>,
    {
        if let Some(cached) = self.store.get(query).await? {
            return Ok(cached);
        }

        let fresh = self.source.fetch(query).await?;
        self.store.put(query, &fresh).await?;

        Ok(fresh)
    }
}

impl<Q, T, S, C> Source<Q, T> for Repository<S, C>
where
    S: Source<Q, T>,
    C: Store<Q, T>,
{
    async fn fetch(&self, query: &Q) -> Result<T, Box<dyn Error>> {
        self.get(query).await
    }
}

/// Simple in-memory store suitable for tests and local process caches.
pub struct InMemoryStore<Q, T> {
    values: Mutex<HashMap<Q, T>>,
}

impl<Q, T> InMemoryStore<Q, T> {
    /// Creates an empty in-memory store.
    ///
    /// # Examples
    ///
    /// ```rust
    /// use tradingkit::data::InMemoryStore;
    ///
    /// let _store = InMemoryStore::<String, String>::new();
    /// ```
    pub fn new() -> Self {
        InMemoryStore {
            values: Mutex::new(HashMap::new()),
        }
    }
}

impl<Q, T> Default for InMemoryStore<Q, T> {
    fn default() -> Self {
        Self::new()
    }
}

impl<Q, T> Store<Q, T> for InMemoryStore<Q, T>
where
    Q: Eq + Hash + Clone,
    T: Clone,
{
    async fn get(&self, query: &Q) -> Result<Option<T>, Box<dyn Error>> {
        let values = self
            .values
            .lock()
            .map_err(|_| std::io::Error::other("in-memory store lock poisoned"))?;

        Ok(values.get(query).cloned())
    }

    async fn put(&self, query: &Q, value: &T) -> Result<(), Box<dyn Error>> {
        let mut values = self
            .values
            .lock()
            .map_err(|_| std::io::Error::other("in-memory store lock poisoned"))?;

        values.insert(query.clone(), value.clone());

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use std::sync::Arc;
    use std::sync::atomic::{AtomicUsize, Ordering};

    struct CountingSource {
        calls: Arc<AtomicUsize>,
        value: String,
    }

    impl CountingSource {
        fn new(calls: Arc<AtomicUsize>, value: String) -> Self {
            CountingSource { calls, value }
        }
    }

    impl Source<String, String> for CountingSource {
        async fn fetch(&self, _query: &String) -> Result<String, Box<dyn Error>> {
            self.calls.fetch_add(1, Ordering::SeqCst);
            Ok(self.value.clone())
        }
    }

    #[tokio::test]
    async fn test_repository_reads_from_source_then_cache() {
        let calls = Arc::new(AtomicUsize::new(0));
        let repository = Repository::new(
            CountingSource::new(calls.clone(), "fresh".to_string()),
            InMemoryStore::<String, String>::new(),
        );
        let query = "account".to_string();

        let first = repository.get(&query).await.unwrap();
        let second = repository.get(&query).await.unwrap();

        assert_eq!(first, "fresh");
        assert_eq!(second, "fresh");
        assert_eq!(calls.load(Ordering::SeqCst), 1);
    }

    #[tokio::test]
    async fn test_in_memory_store_returns_written_value() {
        let store = InMemoryStore::<String, String>::new();
        let query = "order:1".to_string();
        let value = "cached".to_string();

        store.put(&query, &value).await.unwrap();

        assert_eq!(store.get(&query).await.unwrap(), Some("cached".to_string()));
    }
}