use std::convert::TryFrom;
use std::io::Cursor;
use std::path::{Path, PathBuf};
use anyhow::Result;
use async_compression::tokio::{bufread::GzipDecoder, write::GzipEncoder};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
};
use tokio_tar::Archive;
use wasmcloud_control_interface::HostInventory;
pub const INVENTORY_FILE: &str = "inventory.json";
pub const MESSAGES_DIR: &str = "messages";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableMessage {
pub subject: String,
pub reply: Option<String>,
pub payload: bytes::Bytes,
pub description: Option<String>,
pub length: usize,
pub published: time::OffsetDateTime,
}
impl TryFrom<async_nats::jetstream::Message> for SerializableMessage {
type Error = anyhow::Error;
fn try_from(msg: async_nats::jetstream::Message) -> Result<Self, Self::Error> {
let published = msg.info().map_err(|e| anyhow::anyhow!("{e:?}"))?.published;
Ok(Self {
subject: msg.message.subject.to_string(),
reply: msg.message.reply.map(|s| s.to_string()),
payload: msg.message.payload,
description: msg.message.description,
length: msg.message.length,
published,
})
}
}
pub struct ReadCapture {
pub inventory: Vec<HostInventory>,
pub messages: Vec<SerializableMessage>,
}
impl ReadCapture {
pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
let file = File::open(path).await?;
let mut archive = Archive::new(GzipDecoder::new(tokio::io::BufReader::new(file)));
let mut capture = ReadCapture {
inventory: Vec::new(),
messages: Vec::new(),
};
let mut entries = archive.entries()?;
while let Some(mut entry) = entries.try_next().await? {
let path = entry.path()?;
if path.file_name().unwrap_or_default() == INVENTORY_FILE {
let mut buf = Vec::new();
entry.read_to_end(&mut buf).await?;
capture.inventory = serde_json::from_slice(&buf)?;
} else if path
.parent()
.and_then(|p| p.file_name())
.unwrap_or_default()
== MESSAGES_DIR
&& path.extension().unwrap_or_default() == "json"
{
let mut buf = Vec::new();
entry.read_to_end(&mut buf).await?;
let msg: SerializableMessage = serde_json::from_slice(&buf)?;
capture.messages.push(msg);
}
}
Ok(capture)
}
}
pub struct WriteCapture {
builder: tokio_tar::Builder<GzipEncoder<File>>,
current_index: usize,
}
impl WriteCapture {
pub async fn start(inventory: Vec<HostInventory>, path: impl AsRef<Path>) -> Result<Self> {
let file = File::create(path).await?;
let encoder = GzipEncoder::new(file);
let mut builder = tokio_tar::Builder::new(encoder);
let inventory_data = serde_json::to_vec(&inventory)?;
let mut header = tokio_tar::Header::new_gnu();
header.set_size(inventory_data.len() as u64);
header.set_cksum();
builder
.append_data(&mut header, INVENTORY_FILE, Cursor::new(inventory_data))
.await?;
Ok(Self {
builder,
current_index: 0,
})
}
pub async fn add_message(&mut self, msg: SerializableMessage) -> Result<()> {
let data = serde_json::to_vec(&msg)?;
let mut header = tokio_tar::Header::new_gnu();
header.set_size(data.len() as u64);
header.set_cksum();
let path = PathBuf::from(MESSAGES_DIR).join(format!(
"{}-{}.json",
self.current_index,
msg.published
.format(&time::format_description::well_known::Rfc3339)?
));
self.builder
.append_data(&mut header, path, Cursor::new(data))
.await?;
self.current_index += 1;
Ok(())
}
pub async fn finish(self) -> Result<()> {
let mut encoder = self.builder.into_inner().await?;
encoder.flush().await?;
encoder.shutdown().await?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn test_roundtrip() {
let tempdir = tempfile::tempdir().unwrap();
let tarball = tempdir.path().join("capture.tar.gz");
let mut capture = WriteCapture::start(
vec![HostInventory {
host_id: "test".to_string(),
..Default::default()
}],
&tarball,
)
.await
.expect("Should be able to start a capture");
capture
.add_message(SerializableMessage {
subject: "first".to_string(),
reply: None,
payload: bytes::Bytes::from("test"),
description: None,
length: 5,
published: time::OffsetDateTime::now_utc(),
})
.await
.expect("Should be able to add a message");
capture
.add_message(SerializableMessage {
subject: "second".to_string(),
reply: None,
payload: bytes::Bytes::from("test"),
description: None,
length: 6,
published: time::OffsetDateTime::now_utc(),
})
.await
.expect("Should be able to add a message");
capture
.finish()
.await
.expect("Should be able to finish a capture");
let capture = ReadCapture::load(&tarball)
.await
.expect("Should be able to load a capture");
assert_eq!(
capture.inventory.len(),
1,
"Should have the correct inventory"
);
assert_eq!(
capture.inventory[0].host_id, "test",
"Should have the correct inventory"
);
assert_eq!(
capture.messages.len(),
2,
"Should have the right amount of messages"
);
assert_eq!(
capture.messages[0].subject, "first",
"Should have the right ordering"
);
assert_eq!(
capture.messages[1].subject, "second",
"Should have the right ordering"
);
}
}