wash_lib/
capture.rs

1use std::convert::TryFrom;
2use std::io::Cursor;
3use std::path::{Path, PathBuf};
4
5use anyhow::Result;
6use async_compression::tokio::{bufread::GzipDecoder, write::GzipEncoder};
7use async_nats::HeaderMap;
8use futures::TryStreamExt;
9use serde::{Deserialize, Serialize};
10use tokio::{
11    fs::File,
12    io::{AsyncReadExt, AsyncWriteExt},
13};
14use tokio_tar::Archive;
15use wasmcloud_control_interface::HostInventory;
16
17pub const INVENTORY_FILE: &str = "inventory.json";
18pub const MESSAGES_DIR: &str = "messages";
19
20/// A subset of NATS message info that we need to serialize for now. Basically it is all the types that easily
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct SerializableMessage {
23    pub subject: String,
24    pub reply: Option<String>,
25    pub payload: bytes::Bytes,
26    pub description: Option<String>,
27    pub length: usize,
28    pub published: time::OffsetDateTime,
29    pub headers: Option<HeaderMap>,
30}
31
32impl TryFrom<async_nats::jetstream::Message> for SerializableMessage {
33    type Error = anyhow::Error;
34
35    fn try_from(msg: async_nats::jetstream::Message) -> Result<Self, Self::Error> {
36        let published = msg.info().map_err(|e| anyhow::anyhow!("{e:?}"))?.published;
37        Ok(Self {
38            subject: msg.message.subject.to_string(),
39            reply: msg.message.reply.map(|s| s.to_string()),
40            payload: msg.message.payload,
41            description: msg.message.description,
42            length: msg.message.length,
43            headers: msg.message.headers,
44            published,
45        })
46    }
47}
48
49/// A read capture is a parsed tarball that contains all of the messages and inventory for a given
50/// capture.
51///
52/// Currently this only loads the data into memory, but we might add additional helper methods in
53/// the future.
54///
55/// NOTE: The interior structure of the tarball is not a guaranteed API and may change in the
56/// future. All interactions should be done through this type
57pub struct ReadCapture {
58    pub inventory: Vec<HostInventory>,
59    // NOTE: A further optimization would be to only load based off of a filter here rather than
60    // possibly having thousands of messages
61    pub messages: Vec<SerializableMessage>,
62}
63
64impl ReadCapture {
65    /// Loads the given capture file from the path and returns all of the data
66    pub async fn load(path: impl AsRef<Path>) -> Result<Self> {
67        let file = File::open(&path).await.map_err(|e| {
68            std::io::Error::new(
69                e.kind(),
70                format!(
71                    "failed to load capture from file [{}]: {e}",
72                    path.as_ref().display()
73                ),
74            )
75        })?;
76        let mut archive = Archive::new(GzipDecoder::new(tokio::io::BufReader::new(file)));
77
78        let mut capture = ReadCapture {
79            inventory: Vec::new(),
80            messages: Vec::new(),
81        };
82        let mut entries = archive.entries()?;
83        while let Some(mut entry) = entries.try_next().await? {
84            let path = entry.path()?;
85            if path.file_name().unwrap_or_default() == INVENTORY_FILE {
86                let mut buf = Vec::new();
87                entry.read_to_end(&mut buf).await?;
88                // We can't use a reader because it is async
89                capture.inventory = serde_json::from_slice(&buf)?;
90            } else if path
91                .parent()
92                .and_then(|p| p.file_name())
93                .unwrap_or_default()
94                == MESSAGES_DIR
95                && path.extension().unwrap_or_default() == "json"
96            {
97                // For any path that matches messages/*.json, we want to read the file and
98                // deserialize it. Ordering should be the same as we wrote it (in order), so reading
99                // out should be ok
100                let mut buf = Vec::new();
101                entry.read_to_end(&mut buf).await?;
102                let msg: SerializableMessage = serde_json::from_slice(&buf)?;
103                capture.messages.push(msg);
104            }
105        }
106        Ok(capture)
107    }
108}
109
110pub struct WriteCapture {
111    builder: tokio_tar::Builder<GzipEncoder<File>>,
112    current_index: usize,
113}
114
115impl WriteCapture {
116    /// Create a new `WriteCapture` that will write the capture tarball to the given path with the
117    /// expected inventory
118    pub async fn start(inventory: Vec<HostInventory>, path: impl AsRef<Path>) -> Result<Self> {
119        let file = File::create(path).await?;
120        let encoder = GzipEncoder::new(file);
121        let mut builder = tokio_tar::Builder::new(encoder);
122        // We always start by encoding the inventory first
123        let inventory_data = serde_json::to_vec(&inventory)?;
124        let mut header = tokio_tar::Header::new_gnu();
125        header.set_size(inventory_data.len() as u64);
126        header.set_cksum();
127        builder
128            .append_data(&mut header, INVENTORY_FILE, Cursor::new(inventory_data))
129            .await?;
130        Ok(Self {
131            builder,
132            current_index: 0,
133        })
134    }
135
136    /// Adds an observed message to the capture
137    pub async fn add_message(&mut self, msg: SerializableMessage) -> Result<()> {
138        // NOTE(thomastaylor312): If encoding in json becomes a bottleneck, we can switch to a more
139        // efficient format, but I figured this could be easier for people to read if someone
140        // unpacks the message themselves
141        let data = serde_json::to_vec(&msg)?;
142        let mut header = tokio_tar::Header::new_gnu();
143        header.set_size(data.len() as u64);
144        header.set_cksum();
145        // Name of the file is an incrementing index and the timestamp of the message
146        let path = PathBuf::from(MESSAGES_DIR).join(format!(
147            "{}-{}.json",
148            self.current_index,
149            msg.published
150                .format(&time::format_description::well_known::Rfc3339)?
151        ));
152        self.builder
153            .append_data(&mut header, path, Cursor::new(data))
154            .await?;
155        self.current_index += 1;
156        Ok(())
157    }
158
159    /// Marks the tarball write as complete and flushes the underlying writer to disk
160    pub async fn finish(self) -> Result<()> {
161        let mut encoder = self.builder.into_inner().await?;
162        encoder.flush().await?;
163        encoder.shutdown().await?;
164        Ok(())
165    }
166}
167
168#[cfg(test)]
169mod test {
170    use super::*;
171
172    #[tokio::test]
173    async fn test_roundtrip() {
174        let tempdir = tempfile::tempdir().unwrap();
175        let tarball = tempdir.path().join("capture.tar.gz");
176        let mut capture = WriteCapture::start(
177            vec![HostInventory::builder()
178                .host_id("test".into())
179                .friendly_name("test".into())
180                .version("1.0.0".into())
181                .uptime_human("t".into())
182                .uptime_seconds(100)
183                .build()
184                .expect("failed to build host inventory")],
185            &tarball,
186        )
187        .await
188        .expect("Should be able to start a capture");
189        capture
190            .add_message(SerializableMessage {
191                subject: "first".to_string(),
192                reply: None,
193                payload: bytes::Bytes::from("test"),
194                description: None,
195                length: 5,
196                published: time::OffsetDateTime::now_utc(),
197                headers: None,
198            })
199            .await
200            .expect("Should be able to add a message");
201        capture
202            .add_message(SerializableMessage {
203                subject: "second".to_string(),
204                reply: None,
205                payload: bytes::Bytes::from("test"),
206                description: None,
207                length: 6,
208                published: time::OffsetDateTime::now_utc(),
209                headers: None,
210            })
211            .await
212            .expect("Should be able to add a message");
213
214        capture
215            .finish()
216            .await
217            .expect("Should be able to finish a capture");
218
219        let capture = ReadCapture::load(&tarball)
220            .await
221            .expect("Should be able to load a capture");
222
223        assert_eq!(
224            capture.inventory.len(),
225            1,
226            "Should have the correct inventory"
227        );
228        assert_eq!(
229            capture.inventory[0].host_id(),
230            "test",
231            "Should have the correct inventory"
232        );
233        assert_eq!(
234            capture.messages.len(),
235            2,
236            "Should have the right amount of messages"
237        );
238        assert_eq!(
239            capture.messages[0].subject, "first",
240            "Should have the right ordering"
241        );
242        assert_eq!(
243            capture.messages[1].subject, "second",
244            "Should have the right ordering"
245        );
246    }
247}