Skip to main content

durable_streams_server/transfer/
export.rs

1//! Stream export — reads all (or filtered) streams from storage and writes
2//! a self-describing JSON document that can be imported elsewhere.
3
4use super::TransferError;
5use super::format::{ExportDocument, ExportedMessage, ExportedStream, FORMAT_VERSION};
6use crate::protocol::offset::Offset;
7use crate::storage::Storage;
8use base64::Engine;
9use base64::engine::general_purpose::STANDARD as BASE64;
10use chrono::Utc;
11use std::io::Write;
12
13/// Options controlling which streams to export.
14pub struct ExportOptions {
15    /// If non-empty, only export streams whose names appear in this list.
16    /// An empty list means "export everything".
17    pub stream_names: Vec<String>,
18}
19
20/// Counts returned after a successful export.
21pub struct ExportStats {
22    /// Number of streams written to the output.
23    pub streams_exported: usize,
24    /// Total number of messages across all exported streams.
25    pub messages_exported: usize,
26}
27
28/// Export streams from storage to a JSON writer.
29///
30/// Streams are listed via [`Storage::list_streams`], optionally filtered by
31/// `options.stream_names`, then each stream's messages are read in full and
32/// base64-encoded into the output document.
33///
34/// # Errors
35///
36/// Returns [`TransferError`] if a storage operation, JSON serialization, or
37/// I/O write fails.
38pub fn export_streams<W: Write>(
39    storage: &dyn Storage,
40    options: &ExportOptions,
41    writer: W,
42) -> std::result::Result<ExportStats, TransferError> {
43    let all_streams = storage.list_streams()?;
44
45    let mut exported_streams = Vec::new();
46    let mut total_messages = 0usize;
47
48    for (name, metadata) in &all_streams {
49        if !options.stream_names.is_empty() && !options.stream_names.contains(name) {
50            continue;
51        }
52
53        let read_result = storage.read(name, &Offset::start())?;
54
55        let messages = exported_messages(&read_result.messages);
56
57        total_messages += messages.len();
58
59        exported_streams.push(ExportedStream {
60            name: name.clone(),
61            config: metadata.config.clone(),
62            closed: metadata.closed,
63            created_at: metadata.created_at,
64            updated_at: metadata.updated_at,
65            total_bytes: metadata.total_bytes,
66            message_count: metadata.message_count,
67            next_offset: metadata.next_offset.to_string(),
68            messages,
69        });
70    }
71
72    let doc = ExportDocument {
73        format_version: FORMAT_VERSION,
74        exported_at: Utc::now(),
75        server_version: env!("CARGO_PKG_VERSION").to_string(),
76        streams: exported_streams,
77    };
78
79    let streams_exported = doc.streams.len();
80    serde_json::to_writer_pretty(writer, &doc)?;
81
82    Ok(ExportStats {
83        streams_exported,
84        messages_exported: total_messages,
85    })
86}
87
88fn exported_messages(messages: &[bytes::Bytes]) -> Vec<ExportedMessage> {
89    let mut next_read_seq = 0_u64;
90    let mut next_byte_offset = 0_u64;
91
92    messages
93        .iter()
94        .map(|data| {
95            let exported = ExportedMessage {
96                offset: Offset::new(next_read_seq, next_byte_offset).to_string(),
97                data_base64: BASE64.encode(data),
98            };
99            next_read_seq = next_read_seq.saturating_add(1);
100            next_byte_offset = next_byte_offset
101                .saturating_add(u64::try_from(data.len()).expect("message length must fit in u64"));
102            exported
103        })
104        .collect()
105}
106
107#[cfg(test)]
108mod tests {
109    use super::exported_messages;
110    use crate::protocol::offset::Offset;
111    use bytes::Bytes;
112
113    #[test]
114    fn exported_messages_capture_real_byte_offsets() {
115        let exported = exported_messages(&[
116            Bytes::from_static(b"hi"),
117            Bytes::from_static(b"there"),
118            Bytes::from_static(b"!"),
119        ]);
120
121        let offsets: Vec<_> = exported.into_iter().map(|message| message.offset).collect();
122        assert_eq!(
123            offsets,
124            vec![
125                Offset::new(0, 0).to_string(),
126                Offset::new(1, 2).to_string(),
127                Offset::new(2, 7).to_string(),
128            ]
129        );
130    }
131}