durable_streams_server/transfer/
export.rs1use super::TransferError;
5use super::format::{ExportDocument, ExportedMessage, ExportedStream, FORMAT_VERSION};
6use crate::protocol::offset::Offset;
7use crate::storage::Storage;
8use base64::Engine;
9use base64::engine::general_purpose::STANDARD as BASE64;
10use chrono::Utc;
11use std::io::Write;
12
13pub struct ExportOptions {
15 pub stream_names: Vec<String>,
18}
19
20pub struct ExportStats {
22 pub streams_exported: usize,
24 pub messages_exported: usize,
26}
27
28pub fn export_streams<W: Write>(
39 storage: &dyn Storage,
40 options: &ExportOptions,
41 writer: W,
42) -> std::result::Result<ExportStats, TransferError> {
43 let all_streams = storage.list_streams()?;
44
45 let mut exported_streams = Vec::new();
46 let mut total_messages = 0usize;
47
48 for (name, metadata) in &all_streams {
49 if !options.stream_names.is_empty() && !options.stream_names.contains(name) {
50 continue;
51 }
52
53 let read_result = storage.read(name, &Offset::start())?;
54
55 let messages = exported_messages(&read_result.messages);
56
57 total_messages += messages.len();
58
59 exported_streams.push(ExportedStream {
60 name: name.clone(),
61 config: metadata.config.clone(),
62 closed: metadata.closed,
63 created_at: metadata.created_at,
64 updated_at: metadata.updated_at,
65 total_bytes: metadata.total_bytes,
66 message_count: metadata.message_count,
67 next_offset: metadata.next_offset.to_string(),
68 messages,
69 });
70 }
71
72 let doc = ExportDocument {
73 format_version: FORMAT_VERSION,
74 exported_at: Utc::now(),
75 server_version: env!("CARGO_PKG_VERSION").to_string(),
76 streams: exported_streams,
77 };
78
79 let streams_exported = doc.streams.len();
80 serde_json::to_writer_pretty(writer, &doc)?;
81
82 Ok(ExportStats {
83 streams_exported,
84 messages_exported: total_messages,
85 })
86}
87
88fn exported_messages(messages: &[bytes::Bytes]) -> Vec<ExportedMessage> {
89 let mut next_read_seq = 0_u64;
90 let mut next_byte_offset = 0_u64;
91
92 messages
93 .iter()
94 .map(|data| {
95 let exported = ExportedMessage {
96 offset: Offset::new(next_read_seq, next_byte_offset).to_string(),
97 data_base64: BASE64.encode(data),
98 };
99 next_read_seq = next_read_seq.saturating_add(1);
100 next_byte_offset = next_byte_offset
101 .saturating_add(u64::try_from(data.len()).expect("message length must fit in u64"));
102 exported
103 })
104 .collect()
105}
106
107#[cfg(test)]
108mod tests {
109 use super::exported_messages;
110 use crate::protocol::offset::Offset;
111 use bytes::Bytes;
112
113 #[test]
114 fn exported_messages_capture_real_byte_offsets() {
115 let exported = exported_messages(&[
116 Bytes::from_static(b"hi"),
117 Bytes::from_static(b"there"),
118 Bytes::from_static(b"!"),
119 ]);
120
121 let offsets: Vec<_> = exported.into_iter().map(|message| message.offset).collect();
122 assert_eq!(
123 offsets,
124 vec![
125 Offset::new(0, 0).to_string(),
126 Offset::new(1, 2).to_string(),
127 Offset::new(2, 7).to_string(),
128 ]
129 );
130 }
131}