#![allow(unused)]
use crate::export::{DataExporter, ExportResult};
use crate::types::{OHLC, Tick};
use couch_rs::{Client, database::Database, document::TypedCouchDocument, error::CouchError};
use rust_decimal::prelude::ToPrimitive;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::borrow::Cow;
pub struct CouchDbExporter {
server_url: String,
database_name: String,
username: Option<String>,
password: Option<String>,
batch_size: usize,
}
impl CouchDbExporter {
pub fn new(server_url: impl Into<String>, database_name: impl Into<String>) -> Self {
Self {
server_url: server_url.into(),
database_name: database_name.into(),
username: None,
password: None,
batch_size: 1000,
}
}
#[cfg(feature = "dotenvy")]
pub fn from_env() -> Self {
let _ = dotenvy::dotenv();
let server_url = std::env::var("COUCHDB_URL")
.unwrap_or_else(|_| "http://localhost:5984".to_string());
let database_name = std::env::var("COUCHDB_DATABASE")
.unwrap_or_else(|_| "market_data".to_string());
let mut exporter = Self::new(server_url, database_name);
if let (Ok(username), Ok(password)) = (
std::env::var("COUCHDB_USERNAME"),
std::env::var("COUCHDB_PASSWORD")
) {
exporter = exporter.with_auth(username, password);
}
if let Ok(batch_size) = std::env::var("EXPORT_BATCH_SIZE") {
if let Ok(size) = batch_size.parse() {
exporter = exporter.with_batch_size(size);
}
}
exporter
}
pub fn with_auth(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
self.username = Some(username.into());
self.password = Some(password.into());
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn new_with_options(server_url: impl Into<String>, database_name: impl Into<String>, options: CouchDbOptions) -> Self {
Self {
server_url: server_url.into(),
database_name: database_name.into(),
username: options.username,
password: options.password,
batch_size: options.batch_size,
}
}
async fn get_database(&self) -> Result<Database, CouchError> {
let client = if let (Some(username), Some(password)) = (&self.username, &self.password) {
Client::new(&self.server_url, username, password)?
} else {
Client::new_no_auth(&self.server_url)?
};
let db = match client.db(&self.database_name).await {
Ok(db) => db,
Err(_) => {
client.make_db(&self.database_name).await?;
client.db(&self.database_name).await?
}
};
Ok(db)
}
pub async fn export_ohlc_async(&self, data: &[OHLC]) -> ExportResult<()> {
let db = self.get_database().await?;
let mut documents: Vec<OhlcDocument> = data.iter()
.map(|ohlc| OhlcDocument::from_ohlc(ohlc, "MARKET"))
.collect();
for chunk in documents.chunks_mut(self.batch_size) {
db.bulk_docs(chunk).await?;
}
self.create_views(&db).await?;
Ok(())
}
pub async fn export_ticks_async(&self, data: &[Tick]) -> ExportResult<()> {
let db = self.get_database().await?;
let mut documents: Vec<TickDocument> = data.iter()
.map(|tick| TickDocument::from_tick(tick, "MARKET"))
.collect();
for chunk in documents.chunks_mut(self.batch_size) {
db.bulk_docs(chunk).await?;
}
self.create_views(&db).await?;
Ok(())
}
async fn create_views(&self, db: &Database) -> Result<(), CouchError> {
let design_doc = serde_json::json!({
"_id": "_design/market_data",
"views": {
"by_timestamp": {
"map": "function(doc) { if(doc.timestamp) { emit(doc.timestamp, doc); } }"
},
"by_symbol_and_timestamp": {
"map": "function(doc) { if(doc.symbol && doc.timestamp) { emit([doc.symbol, doc.timestamp], doc); } }"
},
"by_type": {
"map": "function(doc) { if(doc.doc_type) { emit(doc.doc_type, doc); } }"
},
"ohlc_by_date_range": {
"map": "function(doc) { if(doc.doc_type === 'ohlc' && doc.timestamp) { emit(doc.timestamp, {open: doc.open, high: doc.high, low: doc.low, close: doc.close, volume: doc.volume}); } }"
},
"ticks_by_date_range": {
"map": "function(doc) { if(doc.doc_type === 'tick' && doc.timestamp) { emit(doc.timestamp, {price: doc.price, bid: doc.bid, ask: doc.ask, volume: doc.volume}); } }"
}
}
});
let mut doc = design_doc.clone();
match db.save(&mut doc).await {
Ok(_) => Ok(()),
Err(_) => {
match db.get::<serde_json::Value>("_design/market_data").await {
Ok(mut existing) => {
existing["views"] = design_doc["views"].clone();
db.save(&mut existing).await?;
Ok(())
},
Err(e) => Err(e)
}
}
}
}
}
impl DataExporter for CouchDbExporter {
fn export_ohlc<P: AsRef<Path>>(&self, data: &[OHLC], path: P) -> ExportResult<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(self.export_ohlc_async(data))
}
fn export_ticks<P: AsRef<Path>>(&self, data: &[Tick], path: P) -> ExportResult<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(self.export_ticks_async(data))
}
fn export_ohlc_to_writer<W: std::io::Write>(&self, data: &[OHLC], mut writer: W) -> ExportResult<()> {
let documents: Vec<OhlcDocument> = data.iter()
.map(|ohlc| OhlcDocument::from_ohlc(ohlc, "MARKET"))
.collect();
for doc in documents {
let json = serde_json::to_string(&doc)?;
writeln!(writer, "{json}")?;
}
Ok(())
}
fn export_ticks_to_writer<W: std::io::Write>(&self, data: &[Tick], mut writer: W) -> ExportResult<()> {
let documents: Vec<TickDocument> = data.iter()
.map(|tick| TickDocument::from_tick(tick, "MARKET"))
.collect();
for doc in documents {
let json = serde_json::to_string(&doc)?;
writeln!(writer, "{json}")?;
}
Ok(())
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct OhlcDocument {
#[serde(rename = "_id")]
id: String,
#[serde(rename = "_rev", skip_serializing_if = "Option::is_none")]
rev: Option<String>,
doc_type: String,
symbol: String,
timestamp: i64,
open: f64,
high: f64,
low: f64,
close: f64,
volume: f64,
}
impl OhlcDocument {
fn from_ohlc(ohlc: &OHLC, symbol: &str) -> Self {
let id = format!("ohlc_{}_{}", symbol, ohlc.timestamp);
Self {
id,
rev: None,
doc_type: "ohlc".to_string(),
symbol: symbol.to_string(),
timestamp: ohlc.timestamp,
open: ohlc.open.to_f64().unwrap_or(0.0),
high: ohlc.high.to_f64().unwrap_or(0.0),
low: ohlc.low.to_f64().unwrap_or(0.0),
close: ohlc.close.to_f64().unwrap_or(0.0),
volume: ohlc.volume.as_f64(),
}
}
}
impl TypedCouchDocument for OhlcDocument {
fn get_id(&self) -> Cow<'_, str> {
Cow::Borrowed(&self.id)
}
fn get_rev(&self) -> Cow<'_, str> {
match &self.rev {
Some(rev) => Cow::Borrowed(rev),
None => Cow::Borrowed(""),
}
}
fn set_id(&mut self, id: &str) {
self.id = id.to_string();
}
fn set_rev(&mut self, rev: &str) {
self.rev = Some(rev.to_string());
}
fn merge_ids(&mut self, other: &Self) {
self.id = other.id.clone();
self.rev = other.rev.clone();
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct TickDocument {
#[serde(rename = "_id")]
id: String,
#[serde(rename = "_rev", skip_serializing_if = "Option::is_none")]
rev: Option<String>,
doc_type: String,
symbol: String,
timestamp: i64,
price: f64,
bid: f64,
ask: f64,
volume: f64,
}
impl TickDocument {
fn from_tick(tick: &Tick, symbol: &str) -> Self {
let id = format!("tick_{}_{}", symbol, tick.timestamp);
Self {
id,
rev: None,
doc_type: "tick".to_string(),
symbol: symbol.to_string(),
timestamp: tick.timestamp,
price: tick.price.to_f64().unwrap_or(0.0),
bid: tick.bid.unwrap_or(tick.price).to_f64().unwrap_or(0.0),
ask: tick.ask.unwrap_or(tick.price).to_f64().unwrap_or(0.0),
volume: tick.volume.as_f64(),
}
}
}
impl TypedCouchDocument for TickDocument {
fn get_id(&self) -> Cow<'_, str> {
Cow::Borrowed(&self.id)
}
fn get_rev(&self) -> Cow<'_, str> {
match &self.rev {
Some(rev) => Cow::Borrowed(rev),
None => Cow::Borrowed(""),
}
}
fn set_id(&mut self, id: &str) {
self.id = id.to_string();
}
fn set_rev(&mut self, rev: &str) {
self.rev = Some(rev.to_string());
}
fn merge_ids(&mut self, other: &Self) {
self.id = other.id.clone();
self.rev = other.rev.clone();
}
}
#[derive(Debug, Clone)]
pub struct CouchDbOptions {
pub server_url: String,
pub database_name: String,
pub username: Option<String>,
pub password: Option<String>,
pub batch_size: usize,
}
impl Default for CouchDbOptions {
fn default() -> Self {
Self {
server_url: "http://localhost:5984".to_string(),
database_name: "market_data".to_string(),
username: None,
password: None,
batch_size: 1000,
}
}
}
impl CouchDbOptions {
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn with_server(mut self, url: impl Into<String>) -> Self {
self.server_url = url.into();
self
}
#[inline]
pub fn with_database(mut self, name: impl Into<String>) -> Self {
self.database_name = name.into();
self
}
pub fn with_auth(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
self.username = Some(username.into());
self.password = Some(password.into());
self
}
#[inline]
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
#[inline]
pub fn batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
pub fn timeout_seconds(self, timeout: u64) -> Self {
self
}
pub fn auto_create_database(self, auto_create: bool) -> Self {
self
}
pub fn username(mut self, username: impl Into<String>) -> Self {
self.username = Some(username.into());
self
}
pub fn password(mut self, password: impl Into<String>) -> Self {
self.password = Some(password.into());
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use rust_decimal::Decimal;
use std::str::FromStr;
#[test]
fn test_couchdb_exporter_creation() {
let exporter = CouchDbExporter::new("http://localhost:5984", "test_db");
assert_eq!(exporter.server_url, "http://localhost:5984");
assert_eq!(exporter.database_name, "test_db");
assert_eq!(exporter.batch_size, 1000);
}
#[test]
#[cfg(feature = "dotenvy")]
fn test_couchdb_exporter_from_env() {
let exporter = CouchDbExporter::from_env();
assert!(!exporter.server_url.is_empty());
assert!(!exporter.database_name.is_empty());
}
#[test]
fn test_couchdb_exporter_with_auth() {
let exporter = CouchDbExporter::new("http://localhost:5984", "test_db")
.with_auth("admin", "password");
assert_eq!(exporter.username, Some("admin".to_string()));
assert_eq!(exporter.password, Some("password".to_string()));
}
#[test]
fn test_couchdb_exporter_with_batch_size() {
let exporter = CouchDbExporter::new("http://localhost:5984", "test_db")
.with_batch_size(500);
assert_eq!(exporter.batch_size, 500);
}
#[test]
fn test_ohlc_document_from_ohlc() {
use crate::types::Volume;
let ohlc = OHLC {
timestamp: 1234567890,
open: Decimal::from(100),
high: Decimal::from(110),
low: Decimal::from(95),
close: Decimal::from(105),
volume: Volume::new(1000),
};
let doc = OhlcDocument::from_ohlc(&ohlc, "TEST");
assert_eq!(doc.id, "ohlc_TEST_1234567890");
assert_eq!(doc.symbol, "TEST");
assert_eq!(doc.timestamp, 1234567890);
assert_eq!(doc.open, 100.0);
assert_eq!(doc.high, 110.0);
assert_eq!(doc.low, 95.0);
assert_eq!(doc.close, 105.0);
assert_eq!(doc.volume, 1000.0);
}
#[test]
fn test_tick_document_from_tick() {
use crate::types::Volume;
let tick = Tick {
timestamp: 1234567890,
price: Decimal::from(100),
bid: Some(Decimal::from_str("99.5").unwrap()),
ask: Some(Decimal::from_str("100.5").unwrap()),
volume: Volume::new(100),
};
let doc = TickDocument::from_tick(&tick, "TEST");
assert_eq!(doc.id, "tick_TEST_1234567890");
assert_eq!(doc.symbol, "TEST");
assert_eq!(doc.timestamp, 1234567890);
assert_eq!(doc.price, 100.0);
assert_eq!(doc.bid, 99.5);
assert_eq!(doc.ask, 100.5);
assert_eq!(doc.volume, 100.0);
}
#[test]
fn test_couchdb_options_default() {
let options = CouchDbOptions::default();
assert_eq!(options.server_url, "http://localhost:5984");
assert_eq!(options.database_name, "market_data");
assert_eq!(options.batch_size, 1000);
assert!(options.username.is_none());
assert!(options.password.is_none());
}
#[test]
fn test_couchdb_options_builder() {
let options = CouchDbOptions::default()
.with_server("http://couchdb:5984")
.with_database("my_data")
.with_auth("user", "pass")
.with_batch_size(2000);
assert_eq!(options.server_url, "http://couchdb:5984");
assert_eq!(options.database_name, "my_data");
assert_eq!(options.username, Some("user".to_string()));
assert_eq!(options.password, Some("pass".to_string()));
assert_eq!(options.batch_size, 2000);
}
}