use super::KrakenMessage;
use crate::{
Identifier,
event::{MarketEvent, MarketIter},
subscription::trade::PublicTrade,
};
use barter_instrument::{Side, exchange::ExchangeId};
use barter_integration::{
serde::de::{datetime_utc_from_epoch_duration, extract_next},
subscription::SubscriptionId,
};
use chrono::{DateTime, Utc};
use serde::Serialize;
pub type KrakenTrades = KrakenMessage<KrakenTradesInner>;
#[derive(Clone, PartialEq, PartialOrd, Debug, Serialize)]
pub struct KrakenTradesInner {
pub subscription_id: SubscriptionId,
pub trades: Vec<KrakenTrade>,
}
#[derive(Copy, Clone, PartialEq, PartialOrd, Debug, Serialize)]
pub struct KrakenTrade {
pub price: f64,
#[serde(rename = "quantity")]
pub amount: f64,
pub time: DateTime<Utc>,
pub side: Side,
}
impl Identifier<Option<SubscriptionId>> for KrakenTradesInner {
fn id(&self) -> Option<SubscriptionId> {
Some(self.subscription_id.clone())
}
}
fn custom_kraken_trade_id(trade: &KrakenTrade) -> String {
format!(
"{}_{}_{}_{}",
trade.time.timestamp_micros(),
trade.side,
trade.price,
trade.amount
)
}
impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, KrakenTrades)>
for MarketIter<InstrumentKey, PublicTrade>
{
fn from((exchange, instrument, trades): (ExchangeId, InstrumentKey, KrakenTrades)) -> Self {
match trades {
KrakenTrades::Data(trades) => trades
.trades
.into_iter()
.map(|trade| {
Ok(MarketEvent {
time_exchange: trade.time,
time_received: Utc::now(),
exchange,
instrument: instrument.clone(),
kind: PublicTrade {
id: custom_kraken_trade_id(&trade),
price: trade.price,
amount: trade.amount,
side: trade.side,
},
})
})
.collect(),
KrakenTrades::Event(_) => Self(vec![]),
}
}
}
impl<'de> serde::de::Deserialize<'de> for KrakenTradesInner {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct SeqVisitor;
impl<'de> serde::de::Visitor<'de> for SeqVisitor {
type Value = KrakenTradesInner;
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("KrakenTradesInner struct from the Kraken WebSocket API")
}
fn visit_seq<SeqAccessor>(
self,
mut seq: SeqAccessor,
) -> Result<Self::Value, SeqAccessor::Error>
where
SeqAccessor: serde::de::SeqAccess<'de>,
{
let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelID")?;
let trades = extract_next(&mut seq, "Vec<KrakenTrade>")?;
let _: serde::de::IgnoredAny = extract_next(&mut seq, "channelName")?;
let subscription_id = extract_next::<SeqAccessor, String>(&mut seq, "pair")
.map(|pair| SubscriptionId::from(format!("trade|{pair}")))?;
while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
Ok(KrakenTradesInner {
subscription_id,
trades,
})
}
}
deserializer.deserialize_seq(SeqVisitor)
}
}
impl<'de> serde::de::Deserialize<'de> for KrakenTrade {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
struct SeqVisitor;
impl<'de> serde::de::Visitor<'de> for SeqVisitor {
type Value = KrakenTrade;
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter.write_str("KrakenTrade struct from the Kraken WebSocket API")
}
fn visit_seq<SeqAccessor>(
self,
mut seq: SeqAccessor,
) -> Result<Self::Value, SeqAccessor::Error>
where
SeqAccessor: serde::de::SeqAccess<'de>,
{
let price = extract_next::<SeqAccessor, String>(&mut seq, "price")?
.parse()
.map_err(serde::de::Error::custom)?;
let amount = extract_next::<SeqAccessor, String>(&mut seq, "quantity")?
.parse()
.map_err(serde::de::Error::custom)?;
let time = extract_next::<SeqAccessor, String>(&mut seq, "time")?
.parse()
.map(|time| {
datetime_utc_from_epoch_duration(std::time::Duration::from_secs_f64(time))
})
.map_err(serde::de::Error::custom)?;
let side: Side = extract_next(&mut seq, "side")?;
while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
Ok(KrakenTrade {
price,
amount,
time,
side,
})
}
}
deserializer.deserialize_seq(SeqVisitor)
}
}
#[cfg(test)]
mod tests {
use super::*;
mod de {
use super::*;
use barter_instrument::Side;
use barter_integration::{
error::SocketError, serde::de::datetime_utc_from_epoch_duration,
subscription::SubscriptionId,
};
#[test]
fn test_kraken_message_trades() {
struct TestCase {
input: &'static str,
expected: Result<KrakenTrades, SocketError>,
}
let tests = vec![TestCase {
input: r#"
[
0,
[
[
"5541.20000",
"0.15850568",
"1534614057.321597",
"s",
"l",
""
],
[
"6060.00000",
"0.02455000",
"1534614057.324998",
"b",
"l",
""
]
],
"trade",
"XBT/USD"
]
"#,
expected: Ok(KrakenTrades::Data(KrakenTradesInner {
subscription_id: SubscriptionId::from("trade|XBT/USD"),
trades: vec![
KrakenTrade {
price: 5541.2,
amount: 0.15850568,
time: datetime_utc_from_epoch_duration(
std::time::Duration::from_secs_f64(1534614057.321597),
),
side: Side::Sell,
},
KrakenTrade {
price: 6060.0,
amount: 0.02455000,
time: datetime_utc_from_epoch_duration(
std::time::Duration::from_secs_f64(1534614057.324998),
),
side: Side::Buy,
},
],
})),
}];
for (index, test) in tests.into_iter().enumerate() {
let actual = serde_json::from_str::<KrakenTrades>(test.input);
match (actual, test.expected) {
(Ok(actual), Ok(expected)) => {
assert_eq!(actual, expected, "TC{} failed", index)
}
(Err(_), Err(_)) => {
}
(actual, expected) => {
panic!(
"TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
);
}
}
}
}
}
}