1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
use std::{borrow::Cow, convert::TryFrom};

use chrono::{DateTime, TimeZone, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value as Json;
use sqlx::FromRow;
use uuid::Uuid;

/// An [`Event`](EventStore::Event) wrapper for events that have been
/// successfully committed to the [`EventStore`].
///
/// [`EventStream`]s are composed of these events.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Persisted<T> {
    aggregate_id: Uuid,
    utc: DateTime<Utc>,
    sequence: i64,
    data: T,
}

impl<T> Persisted<T> {
    /// Returns the event sequence number.
    pub fn sequence(&self) -> i64 {
        self.sequence
    }

    /// Returns the aggregate id.
    pub fn aggregate_id(&self) -> Uuid {
        self.aggregate_id
    }

    pub fn utc(&self) -> DateTime<Utc> {
        self.utc
    }

    pub fn data(&self) -> &T {
        &self.data
    }

    /// Unwraps the inner [`Event`](EventStore::Event) from the `Persisted`
    /// wrapper.
    pub fn into_data(self) -> T {
        self.data
    }
}

pub trait TimescaleEventPayload {
    fn name(&self) -> Cow<'static, str>;
}

#[derive(Debug, FromRow)]
pub struct EventRow {
    pub aggregate_id: Uuid,
    pub sequence: i64,
    pub name: String,
    pub payload: Option<Json>,
    pub time: i64,
}

impl TimescaleEventPayload for EventRow {
    fn name(&self) -> Cow<'static, str> {
        self.name.clone().into()
    }
}

impl<T> TryFrom<EventRow> for Persisted<T>
where
    for<'de> T: Deserialize<'de>,
{
    type Error = serde_json::error::Error;

    fn try_from(row: EventRow) -> serde_json::error::Result<Self> {
        let aggregate_id = row.aggregate_id;
        let utc = Utc.timestamp_nanos(row.time);
        let sequence = row.sequence;

        let event = Persisted {
            aggregate_id,
            utc,
            sequence,
            data: parse_like_externally_tagged(row.name, row.payload)?,
        };

        Ok(event)
    }
}

// reconstruct externally tagged structs
fn parse_like_externally_tagged<T>(
    name: String,
    payload: Option<Json>,
) -> serde_json::error::Result<T>
where
    for<'de> T: Deserialize<'de>,
{
    let value = match payload {
        None => {
            // serialize event payload into a unit struct
            Json::String(name)
        }
        Some(value) => {
            let mut map = serde_json::Map::new();
            map.insert(name, value);

            Json::Object(map)
        }
    };

    serde_json::from_value(value)
}