use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_nats::{jetstream, Message};
use serde_json::Deserializer;
use uuid::Uuid;
use super::header::{self, VERSION_KEY};
use super::subject::NatsSubject;
use crate::envelope::Envelope;
use crate::error::{self, Error};
use crate::event::{Event, Sequence};
use crate::version::DeserializeVersion;
pub struct NatsEnvelope {
id: Uuid,
sequence: u64,
timestamp: i64,
name: String,
version: usize,
message: Message,
}
impl NatsEnvelope {
pub fn try_from_message(
expected_prefix: &str,
message: jetstream::Message,
) -> error::Result<Self> {
let NatsSubject::Aggregate(name, id) =
NatsSubject::try_from_str(expected_prefix, message.subject.as_str())?
else {
return Err(Error::Invalid);
};
let version = header::get(&message, VERSION_KEY)
.ok_or(Error::Invalid)?
.parse::<usize>()
.map_err(|e| Error::Format(e.into()))?;
let (sequence, timestamp) = {
let info = message.info().map_err(Error::Internal)?;
(info.stream_sequence, info.published.unix_timestamp())
};
Ok(Self {
id,
sequence,
timestamp,
name: name.into_owned(),
version,
message: message.split().0,
})
}
}
impl Envelope for NatsEnvelope {
fn id(&self) -> Uuid {
self.id
}
fn sequence(&self) -> Sequence {
self.sequence.into()
}
fn timestamp(&self) -> SystemTime {
UNIX_EPOCH + Duration::from_secs(self.timestamp as u64)
}
fn name(&self) -> &str {
&self.name
}
fn deserialize<'de, E>(&'de self) -> error::Result<E>
where
E: DeserializeVersion<'de> + Event,
{
if self.name != E::name() {
return Err(Error::Invalid);
}
let mut deserializer = Deserializer::from_slice(&self.message.payload);
E::deserialize_version(&mut deserializer, self.version).map_err(|e| Error::Format(e.into()))
}
}