Skip to main content

durable_streams_server/transfer/
import.rs

1//! Stream import — reads a JSON export document and recreates streams
2//! with their messages in a target storage backend.
3
4use super::TransferError;
5use super::format::{ExportDocument, FORMAT_VERSION};
6use crate::storage::{Storage, StreamConfig};
7use base64::Engine;
8use base64::engine::general_purpose::STANDARD as BASE64;
9use bytes::Bytes;
10use std::io::Read;
11
12/// How to handle streams that already exist during import.
13#[derive(Debug, Clone, Copy)]
14pub enum ConflictPolicy {
15    /// Skip streams that already exist (log a warning, continue).
16    Skip,
17    /// Fail the entire import if any stream already exists (pre-scans
18    /// all names before writing).
19    Fail,
20    /// Delete and recreate existing streams from the export data.
21    Replace,
22}
23
24/// Options controlling import behaviour.
25pub struct ImportOptions {
26    /// Strategy for streams whose names collide with existing data.
27    pub conflict_policy: ConflictPolicy,
28}
29
30/// Counts returned after a successful import.
31pub struct ImportStats {
32    /// Number of streams created (or replaced) from the input.
33    pub streams_imported: usize,
34    /// Number of streams skipped because they already existed.
35    pub streams_skipped: usize,
36    /// Total messages written across all imported streams.
37    pub messages_imported: usize,
38}
39
40/// Import streams from a JSON reader into storage.
41///
42/// The reader must contain a valid [`ExportDocument`]
43/// whose `format_version` matches [`FORMAT_VERSION`].
44/// Each stream is created atomically via
45/// [`Storage::create_stream_with_data`].
46///
47/// # Errors
48///
49/// Returns [`TransferError`] on format version mismatch, storage errors,
50/// base64 decode failures, or conflict policy violations.
51pub fn import_streams<R: Read>(
52    storage: &dyn Storage,
53    reader: R,
54    options: &ImportOptions,
55) -> std::result::Result<ImportStats, TransferError> {
56    let doc: ExportDocument = serde_json::from_reader(reader)?;
57
58    if doc.format_version != FORMAT_VERSION {
59        return Err(TransferError::UnsupportedVersion(doc.format_version));
60    }
61
62    // For Fail policy, pre-scan all stream names before writing anything.
63    if matches!(options.conflict_policy, ConflictPolicy::Fail) {
64        for stream in &doc.streams {
65            if storage.exists(&stream.name) {
66                return Err(TransferError::Conflict(stream.name.clone()));
67            }
68        }
69    }
70
71    let mut stats = ImportStats {
72        streams_imported: 0,
73        streams_skipped: 0,
74        messages_imported: 0,
75    };
76
77    for stream in &doc.streams {
78        if storage.exists(&stream.name) {
79            match options.conflict_policy {
80                ConflictPolicy::Skip => {
81                    eprintln!("Skipping existing stream: {}", stream.name);
82                    stats.streams_skipped += 1;
83                    continue;
84                }
85                ConflictPolicy::Fail => {
86                    return Err(TransferError::Conflict(stream.name.clone()));
87                }
88                ConflictPolicy::Replace => {
89                    storage.delete(&stream.name)?;
90                }
91            }
92        }
93
94        let config = StreamConfig {
95            content_type: stream.config.content_type.clone(),
96            ttl_seconds: stream.config.ttl_seconds,
97            expires_at: stream.config.expires_at,
98            created_closed: stream.config.created_closed,
99        };
100
101        let mut messages = Vec::with_capacity(stream.messages.len());
102        for msg in &stream.messages {
103            let data = BASE64.decode(&msg.data_base64)?;
104            messages.push(Bytes::from(data));
105        }
106
107        let msg_count = messages.len();
108        storage.create_stream_with_data(&stream.name, config, messages, stream.closed)?;
109
110        stats.streams_imported += 1;
111        stats.messages_imported += msg_count;
112    }
113
114    Ok(stats)
115}