durable_streams_server/transfer/
import.rs1use super::TransferError;
5use super::format::{ExportDocument, FORMAT_VERSION};
6use crate::storage::{Storage, StreamConfig};
7use base64::Engine;
8use base64::engine::general_purpose::STANDARD as BASE64;
9use bytes::Bytes;
10use std::io::Read;
11
12#[derive(Debug, Clone, Copy)]
14pub enum ConflictPolicy {
15 Skip,
17 Fail,
20 Replace,
22}
23
24pub struct ImportOptions {
26 pub conflict_policy: ConflictPolicy,
28}
29
30pub struct ImportStats {
32 pub streams_imported: usize,
34 pub streams_skipped: usize,
36 pub messages_imported: usize,
38}
39
40pub fn import_streams<R: Read>(
52 storage: &dyn Storage,
53 reader: R,
54 options: &ImportOptions,
55) -> std::result::Result<ImportStats, TransferError> {
56 let doc: ExportDocument = serde_json::from_reader(reader)?;
57
58 if doc.format_version != FORMAT_VERSION {
59 return Err(TransferError::UnsupportedVersion(doc.format_version));
60 }
61
62 if matches!(options.conflict_policy, ConflictPolicy::Fail) {
64 for stream in &doc.streams {
65 if storage.exists(&stream.name) {
66 return Err(TransferError::Conflict(stream.name.clone()));
67 }
68 }
69 }
70
71 let mut stats = ImportStats {
72 streams_imported: 0,
73 streams_skipped: 0,
74 messages_imported: 0,
75 };
76
77 for stream in &doc.streams {
78 if storage.exists(&stream.name) {
79 match options.conflict_policy {
80 ConflictPolicy::Skip => {
81 eprintln!("Skipping existing stream: {}", stream.name);
82 stats.streams_skipped += 1;
83 continue;
84 }
85 ConflictPolicy::Fail => {
86 return Err(TransferError::Conflict(stream.name.clone()));
87 }
88 ConflictPolicy::Replace => {
89 storage.delete(&stream.name)?;
90 }
91 }
92 }
93
94 let config = StreamConfig {
95 content_type: stream.config.content_type.clone(),
96 ttl_seconds: stream.config.ttl_seconds,
97 expires_at: stream.config.expires_at,
98 created_closed: stream.config.created_closed,
99 };
100
101 let mut messages = Vec::with_capacity(stream.messages.len());
102 for msg in &stream.messages {
103 let data = BASE64.decode(&msg.data_base64)?;
104 messages.push(Bytes::from(data));
105 }
106
107 let msg_count = messages.len();
108 storage.create_stream_with_data(&stream.name, config, messages, stream.closed)?;
109
110 stats.streams_imported += 1;
111 stats.messages_imported += msg_count;
112 }
113
114 Ok(stats)
115}