1use std::convert::TryFrom;
2use std::io::Cursor;
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use async_compression::tokio::{bufread::GzipDecoder, write::GzipEncoder};
7use async_nats::HeaderMap;
8use futures::TryStreamExt;
9use serde::{Deserialize, Serialize};
10use tokio::{
11 fs::File,
12 io::{AsyncReadExt, AsyncWriteExt},
13};
14use tokio_tar::Archive;
15use wasmcloud_control_interface::HostInventory;
16
17pub const INVENTORY_FILE: &str = "inventory.json";
18pub const MESSAGES_DIR: &str = "messages";
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SerializableMessage {
23 pub subject: String,
24 pub reply: Option<String>,
25 pub payload: bytes::Bytes,
26 pub description: Option<String>,
27 pub length: usize,
28 pub published: time::OffsetDateTime,
29 pub headers: Option<HeaderMap>,
30}
31
32impl TryFrom<async_nats::jetstream::Message> for SerializableMessage {
33 type Error = anyhow::Error;
34
35 fn try_from(msg: async_nats::jetstream::Message) -> Result<Self, Self::Error> {
36 let published = msg.info().map_err(|e| anyhow::anyhow!("{e:?}"))?.published;
37 Ok(Self {
38 subject: msg.message.subject.to_string(),
39 reply: msg.message.reply.map(|s| s.to_string()),
40 payload: msg.message.payload,
41 description: msg.message.description,
42 length: msg.message.length,
43 headers: msg.message.headers,
44 published,
45 })
46 }
47}
48
49pub struct ReadCapture {
58 pub inventory: Vec<HostInventory>,
59 pub messages: Vec<SerializableMessage>,
62}
63
64impl ReadCapture {
65 pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
67 let file = File::open(&path).await.map_err(|e| {
68 std::io::Error::new(
69 e.kind(),
70 format!(
71 "failed to load capture from file [{}]: {e}",
72 path.as_ref().display()
73 ),
74 )
75 })?;
76 let mut archive = Archive::new(GzipDecoder::new(tokio::io::BufReader::new(file)));
77
78 let mut capture = ReadCapture {
79 inventory: Vec::new(),
80 messages: Vec::new(),
81 };
82 let mut entries = archive.entries()?;
83 while let Some(mut entry) = entries.try_next().await? {
84 let path = entry.path()?;
85 if path.file_name().unwrap_or_default() == INVENTORY_FILE {
86 let mut buf = Vec::new();
87 entry.read_to_end(&mut buf).await?;
88 capture.inventory = serde_json::from_slice(&buf)?;
90 } else if path
91 .parent()
92 .and_then(|p| p.file_name())
93 .unwrap_or_default()
94 == MESSAGES_DIR
95 && path.extension().unwrap_or_default() == "json"
96 {
97 let mut buf = Vec::new();
101 entry.read_to_end(&mut buf).await?;
102 let msg: SerializableMessage = serde_json::from_slice(&buf)?;
103 capture.messages.push(msg);
104 }
105 }
106 Ok(capture)
107 }
108}
109
110pub struct WriteCapture {
111 builder: tokio_tar::Builder<GzipEncoder<File>>,
112 current_index: usize,
113}
114
115impl WriteCapture {
116 pub async fn start(inventory: Vec<HostInventory>, path: impl AsRef<Path>) -> Result<Self> {
119 let file = File::create(path).await?;
120 let encoder = GzipEncoder::new(file);
121 let mut builder = tokio_tar::Builder::new(encoder);
122 let inventory_data = serde_json::to_vec(&inventory)?;
124 let mut header = tokio_tar::Header::new_gnu();
125 header.set_size(inventory_data.len() as u64);
126 header.set_cksum();
127 builder
128 .append_data(&mut header, INVENTORY_FILE, Cursor::new(inventory_data))
129 .await?;
130 Ok(Self {
131 builder,
132 current_index: 0,
133 })
134 }
135
136 pub async fn add_message(&mut self, msg: SerializableMessage) -> Result<()> {
138 let data = serde_json::to_vec(&msg)?;
142 let mut header = tokio_tar::Header::new_gnu();
143 header.set_size(data.len() as u64);
144 header.set_cksum();
145 let path = PathBuf::from(MESSAGES_DIR).join(format!(
147 "{}-{}.json",
148 self.current_index,
149 msg.published
150 .format(&time::format_description::well_known::Rfc3339)?
151 ));
152 self.builder
153 .append_data(&mut header, path, Cursor::new(data))
154 .await?;
155 self.current_index += 1;
156 Ok(())
157 }
158
159 pub async fn finish(self) -> Result<()> {
161 let mut encoder = self.builder.into_inner().await?;
162 encoder.flush().await?;
163 encoder.shutdown().await?;
164 Ok(())
165 }
166}
167
168#[cfg(test)]
169mod test {
170 use super::*;
171
172 #[tokio::test]
173 async fn test_roundtrip() {
174 let tempdir = tempfile::tempdir().unwrap();
175 let tarball = tempdir.path().join("capture.tar.gz");
176 let mut capture = WriteCapture::start(
177 vec![HostInventory::builder()
178 .host_id("test".into())
179 .friendly_name("test".into())
180 .version("1.0.0".into())
181 .uptime_human("t".into())
182 .uptime_seconds(100)
183 .build()
184 .expect("failed to build host inventory")],
185 &tarball,
186 )
187 .await
188 .expect("Should be able to start a capture");
189 capture
190 .add_message(SerializableMessage {
191 subject: "first".to_string(),
192 reply: None,
193 payload: bytes::Bytes::from("test"),
194 description: None,
195 length: 5,
196 published: time::OffsetDateTime::now_utc(),
197 headers: None,
198 })
199 .await
200 .expect("Should be able to add a message");
201 capture
202 .add_message(SerializableMessage {
203 subject: "second".to_string(),
204 reply: None,
205 payload: bytes::Bytes::from("test"),
206 description: None,
207 length: 6,
208 published: time::OffsetDateTime::now_utc(),
209 headers: None,
210 })
211 .await
212 .expect("Should be able to add a message");
213
214 capture
215 .finish()
216 .await
217 .expect("Should be able to finish a capture");
218
219 let capture = ReadCapture::load(&tarball)
220 .await
221 .expect("Should be able to load a capture");
222
223 assert_eq!(
224 capture.inventory.len(),
225 1,
226 "Should have the correct inventory"
227 );
228 assert_eq!(
229 capture.inventory[0].host_id(),
230 "test",
231 "Should have the correct inventory"
232 );
233 assert_eq!(
234 capture.messages.len(),
235 2,
236 "Should have the right amount of messages"
237 );
238 assert_eq!(
239 capture.messages[0].subject, "first",
240 "Should have the right ordering"
241 );
242 assert_eq!(
243 capture.messages[1].subject, "second",
244 "Should have the right ordering"
245 );
246 }
247}