1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
//! Snapshot cache module.
#![feature(try_trait)]

#[macro_use]
extern crate failure;
extern crate futures_locks;

#[macro_use]
extern crate err_convert_macro;
extern crate cxmr_exchanges;
extern crate cxmr_snapshots;

use std::cell::Cell;
use std::collections::HashMap;
use std::sync::Arc;

use futures_locks::RwLock;

use cxmr_exchanges::Market;
use cxmr_snapshots::Snapshot;

/// Store module error.
#[derive(Debug, Fail)]
pub enum Error {
    /// Option is `None` error.
    #[fail(display = "option none")]
    OptionNone,

    /// Snapshot error.
    #[fail(display = "snapshot error: {:?}", _0)]
    Snapshot(#[cause] cxmr_snapshots::Error),
}

err_converter!(Snapshot, cxmr_snapshots::Error);
err_converter_no_args!(OptionNone, std::option::NoneError);

/// Snapshot cache options.
pub struct Options {
    /// Maximum snapshots in memory.
    pub maximum_snapshots: usize,
}

impl Default for Options {
    fn default() -> Self {
        Options {
            maximum_snapshots: 10,
        }
    }
}

/// Shared cached snapshot.
pub type SharedSnapshot = Arc<RwLock<Snapshot>>;

/// Snapshot in-memory cache.
#[derive(Clone)]
pub struct SnapshotCache {
    inner: Arc<RwLock<CacheImpl>>,
}

impl SnapshotCache {
    /// Creates new snapshots in-memory cache.
    pub fn new(options: Options) -> Self {
        SnapshotCache {
            inner: Arc::new(RwLock::new(CacheImpl::new(options))),
        }
    }

    /// Opens snapshot and stores it in memory cache.
    pub async fn open(
        &mut self,
        path: &str,
        market: Market,
        timestamp: u64,
        interval: u64,
    ) -> Result<SharedSnapshot, Error> {
        self.inner
            .read()
            .await
            .open(path, market, timestamp, interval)
            .await
    }

    /// Opens snapshot and stores it in memory cache.
    /// New compressed snapshot will be stored on-disk.
    pub async fn open_and_compress(
        &mut self,
        path: &str,
        market: Market,
        timestamp: u64,
        interval: u64,
    ) -> Result<SharedSnapshot, Error> {
        self.inner
            .read()
            .await
            .open(path, market, timestamp, interval)
            .await
    }
}

/// Snapshot cache key.
#[derive(Hash, PartialEq, Eq)]
struct CacheKey {
    market: Market,
    timestamp: u64,
    interval: u64,
}

/// Snapshot cache implementation.
struct CacheImpl {
    options: Options,
    snapshots: RwLock<HashMap<CacheKey, CachedSnapshot>>,
}

impl CacheImpl {
    fn new(options: Options) -> Self {
        CacheImpl {
            options,
            snapshots: RwLock::new(HashMap::default()),
        }
    }

    async fn open(
        &self,
        path: &str,
        market: Market,
        timestamp: u64,
        interval: u64,
    ) -> Result<SharedSnapshot, Error> {
        let cache_key = CacheKey {
            market,
            timestamp,
            interval,
        };
        let mut cached = self
            .snapshots
            .write()
            .await
            .entry(cache_key)
            .or_default()
            .clone();
        cached.open(path, interval).await
    }
}

/// Cached snapshot.
#[derive(Default, Clone)]
struct CachedSnapshot {
    inner: Arc<RwLock<Cell<Option<SharedSnapshot>>>>,
}

impl CachedSnapshot {
    async fn open(&mut self, path: &str, interval: u64) -> Result<SharedSnapshot, Error> {
        let mut inner = self.inner.write().await;
        if let Some(snapshot) = inner.get_mut() {
            return Ok(snapshot.clone());
        }
        let snapshot = Arc::new(RwLock::new(Snapshot::open(path, interval)?));
        inner.set(Some(snapshot.clone()));
        Ok(snapshot)
    }
}