use std::fmt::Debug;
use std::future;
use std::time::SystemTime;
use fedimint_core::core::OperationId;
use fedimint_core::db::{Database, DatabaseTransaction};
use fedimint_core::encoding::{Decodable, DecodeError, Encodable};
use fedimint_core::module::registry::ModuleDecoderRegistry;
use fedimint_core::task::{MaybeSend, MaybeSync};
use fedimint_core::util::BoxStream;
use fedimint_core::{apply, async_trait_maybe_send};
use futures::{StreamExt, stream};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(transparent)]
pub struct JsonStringed(pub serde_json::Value);
impl Encodable for JsonStringed {
fn consensus_encode<W: std::io::Write>(&self, writer: &mut W) -> Result<(), std::io::Error> {
let json_str = serde_json::to_string(&self.0).expect("JSON serialization should not fail");
json_str.consensus_encode(writer)
}
}
impl Decodable for JsonStringed {
fn consensus_decode_partial<R: std::io::Read>(
r: &mut R,
modules: &ModuleDecoderRegistry,
) -> Result<Self, DecodeError> {
let json_str = String::consensus_decode_partial(r, modules)?;
let value = serde_json::from_str(&json_str).map_err(DecodeError::from_err)?;
Ok(JsonStringed(value))
}
}
#[apply(async_trait_maybe_send!)]
pub trait IOperationLog {
async fn get_operation(&self, operation_id: OperationId) -> Option<OperationLogEntry>;
async fn get_operation_dbtx(
&self,
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
) -> Option<OperationLogEntry>;
async fn add_operation_log_entry_dbtx(
&self,
dbtx: &mut DatabaseTransaction<'_>,
operation_id: OperationId,
operation_type: &str,
operation_meta: serde_json::Value,
);
fn outcome_or_updates(
&self,
db: &Database,
operation_id: OperationId,
operation_log_entry: OperationLogEntry,
stream_gen: Box<dyn FnOnce() -> BoxStream<'static, serde_json::Value>>,
) -> UpdateStreamOrOutcome<serde_json::Value>;
}
#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable, PartialEq, Eq)]
pub struct OperationOutcome {
pub time: SystemTime,
pub outcome: JsonStringed,
}
#[derive(Debug, Serialize, Deserialize, Encodable, Decodable)]
pub struct OperationLogEntry {
pub(crate) operation_module_kind: String,
pub(crate) meta: JsonStringed,
pub(crate) outcome: Option<OperationOutcome>,
}
impl OperationLogEntry {
pub fn new(
operation_module_kind: String,
meta: JsonStringed,
outcome: Option<OperationOutcome>,
) -> Self {
Self {
operation_module_kind,
meta,
outcome,
}
}
pub fn operation_module_kind(&self) -> &str {
&self.operation_module_kind
}
pub fn meta<M: DeserializeOwned>(&self) -> M {
self.try_meta()
.expect("JSON deserialization should not fail")
}
pub fn try_meta<M: DeserializeOwned>(&self) -> Result<M, serde_json::Error> {
serde_json::from_value(self.meta.0.clone())
}
pub fn outcome<D: DeserializeOwned>(&self) -> Option<D> {
self.try_outcome()
.expect("JSON deserialization should not fail")
}
pub fn try_outcome<D: DeserializeOwned>(&self) -> Result<Option<D>, serde_json::Error> {
self.outcome
.as_ref()
.map(|outcome| serde_json::from_value(outcome.outcome.0.clone()))
.transpose()
}
pub fn outcome_time(&self) -> Option<SystemTime> {
self.outcome.as_ref().map(|o| o.time)
}
pub fn set_outcome(&mut self, outcome: impl Into<Option<OperationOutcome>>) {
self.outcome = outcome.into();
}
}
pub enum UpdateStreamOrOutcome<U> {
UpdateStream(BoxStream<'static, U>),
Outcome(U),
}
impl<U: Debug> Debug for UpdateStreamOrOutcome<U> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
UpdateStreamOrOutcome::UpdateStream(_) => write!(f, "UpdateStream"),
UpdateStreamOrOutcome::Outcome(o) => f.debug_tuple("Outcome").field(o).finish(),
}
}
}
impl<U> UpdateStreamOrOutcome<U>
where
U: MaybeSend + MaybeSync + 'static,
{
pub fn into_stream(self) -> BoxStream<'static, U> {
match self {
UpdateStreamOrOutcome::UpdateStream(stream) => stream,
UpdateStreamOrOutcome::Outcome(outcome) => {
Box::pin(stream::once(future::ready(outcome)))
}
}
}
pub async fn await_outcome(self) -> Option<U> {
match self {
UpdateStreamOrOutcome::Outcome(outcome) => Some(outcome),
UpdateStreamOrOutcome::UpdateStream(mut stream) => {
let mut last_update = None;
while let Some(update) = stream.next().await {
last_update = Some(update);
}
last_update
}
}
}
}
#[cfg(test)]
mod tests {
use futures::stream;
use serde_json::Value;
use super::*;
#[tokio::test]
async fn test_await_outcome_cached() {
let test_value = serde_json::json!({"status": "completed", "amount": 100});
let cached_outcome = UpdateStreamOrOutcome::Outcome(test_value.clone());
let result = cached_outcome.await_outcome().await;
assert_eq!(result, Some(test_value));
}
#[tokio::test]
async fn test_await_outcome_uncached_with_updates() {
let update_stream = Box::pin(stream::iter(vec![
Value::from(0),
Value::from(1),
Value::from(2),
]));
let uncached_outcome = UpdateStreamOrOutcome::UpdateStream(update_stream);
let result = uncached_outcome.await_outcome().await;
assert_eq!(result, Some(Value::from(2)));
}
#[tokio::test]
async fn test_await_outcome_uncached_empty_stream() {
let empty_stream = Box::pin(stream::empty::<serde_json::Value>());
let uncached_outcome = UpdateStreamOrOutcome::UpdateStream(empty_stream);
let result = uncached_outcome.await_outcome().await;
assert_eq!(result, None);
}
#[tokio::test]
async fn test_await_outcome_uncached_single_update() {
let update_stream = Box::pin(stream::once(async { Value::from(0) }));
let uncached_outcome = UpdateStreamOrOutcome::UpdateStream(update_stream);
let result = uncached_outcome.await_outcome().await;
assert_eq!(result, Some(Value::from(0)));
}
}