rs_rawzips2blobs2jsons/
lib.rs

1use base64::{Engine as _, engine::general_purpose};
2use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
3use rawzip::{ZipArchive, time::ZipDateTimeKind};
4use serde::Serialize;
5use std::fmt;
6use std::fs::File;
7use std::io::{self, BufRead, BufWriter, Read, Write};
8use std::path::Path;
9
10// A custom error type to distinguish I/O errors from size limit errors.
11#[derive(Debug)]
12pub enum ReadError {
13    Io(io::Error),
14    SizeLimitExceeded,
15}
16
17impl fmt::Display for ReadError {
18    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19        match self {
20            ReadError::Io(e) => write!(f, "{}", e),
21            ReadError::SizeLimitExceeded => write!(f, "file size exceeds limit"),
22        }
23    }
24}
25
26impl From<io::Error> for ReadError {
27    fn from(err: io::Error) -> ReadError {
28        ReadError::Io(err)
29    }
30}
31
32#[derive(Serialize, Debug)]
33pub struct Metadata {
34    #[serde(rename = "ZipName")]
35    pub zip_name: String,
36}
37
38#[derive(Serialize, Debug)]
39pub struct Blob {
40    pub name: String,
41    pub content_type: String,
42    pub content_encoding: String,
43    pub content_transfer_encoding: String,
44    pub body: String,
45    pub metadata: Metadata,
46    pub content_length: u64,
47    pub last_modified: String,
48}
49
50fn zip_datetime_to_chrono_utc(zdt: &ZipDateTimeKind) -> DateTime<Utc> {
51    let (year, month, day, hour, minute, second) = (
52        zdt.year(),
53        zdt.month(),
54        zdt.day(),
55        zdt.hour(),
56        zdt.minute(),
57        zdt.second(),
58    );
59    let naive_date =
60        NaiveDate::from_ymd_opt(year as i32, month as u32, day as u32).unwrap_or_default();
61    let naive_time = chrono::NaiveTime::from_hms_opt(hour as u32, minute as u32, second as u32)
62        .unwrap_or_default();
63    let naive_dt = NaiveDateTime::new(naive_date, naive_time);
64    DateTime::from_naive_utc_and_offset(naive_dt, Utc)
65}
66
67pub fn rdr2buf<R>(rdr: R, buf: &mut Vec<u8>, limit: u64) -> Result<(), ReadError>
68where
69    R: Read,
70{
71    let mut taken = rdr.take(limit + 1);
72    buf.clear();
73    taken.read_to_end(buf)?;
74    if buf.len() as u64 > limit {
75        return Err(ReadError::SizeLimitExceeded);
76    }
77    Ok(())
78}
79
80pub fn filename2buf<P>(filename: P, buf: &mut Vec<u8>, limit: u64) -> Result<(), ReadError>
81where
82    P: AsRef<Path>,
83{
84    let f = File::open(filename)?;
85    rdr2buf(f, buf, limit)
86}
87
88fn rdr2filenames<R>(rdr: R) -> impl Iterator<Item = Result<String, io::Error>>
89where
90    R: BufRead,
91{
92    rdr.lines()
93}
94
95fn stdin2filenames() -> impl Iterator<Item = Result<String, io::Error>> {
96    rdr2filenames(io::stdin().lock())
97}
98
99pub fn buf2zip2blobs2jsons2writer<W>(
100    zip_name: &str,
101    zipdata: &[u8],
102    content_type: &str,
103    content_encoding: &str,
104    max_item_size: u64,
105    verbose: bool,
106    wtr: &mut BufWriter<W>,
107) -> Result<(), io::Error>
108where
109    W: Write,
110{
111    let archive = ZipArchive::from_slice(zipdata).map_err(io::Error::other)?;
112
113    for entry_result in archive.entries() {
114        let entry_header = entry_result.map_err(io::Error::other)?;
115        let wayfinder = entry_header.wayfinder();
116        let entry = archive.get_entry(wayfinder).map_err(io::Error::other)?;
117        let entry_data = entry.data();
118        let file_name = String::from_utf8_lossy(entry_header.file_path().as_bytes()).to_string();
119
120        if entry_data.len() as u64 > max_item_size {
121            if verbose {
122                eprintln!(
123                    "level:warn\tstatus:item_skipped\treason:size_limit_exceeded\tpath:{}\titem:{}\tsize:{}",
124                    zip_name,
125                    file_name,
126                    entry_data.len()
127                );
128            }
129            continue;
130        }
131
132        let dt: DateTime<Utc> = zip_datetime_to_chrono_utc(&entry_header.last_modified());
133
134        let blob = Blob {
135            name: file_name,
136            content_type: content_type.to_string(),
137            content_encoding: content_encoding.to_string(),
138            content_transfer_encoding: "base64".to_string(),
139            body: general_purpose::STANDARD.encode(entry_data),
140            metadata: Metadata {
141                zip_name: zip_name.to_string(),
142            },
143            content_length: entry_data.len() as u64,
144            last_modified: dt.to_rfc3339(),
145        };
146
147        serde_json::to_writer(&mut *wtr, &blob)?;
148        writeln!(&mut *wtr)?;
149    }
150
151    Ok(())
152}
153
154pub struct Options<'a> {
155    pub max_zip_size: u64,
156    pub content_type: &'a str,
157    pub content_encoding: &'a str,
158    pub max_item_size: u64,
159    pub verbose: bool,
160}
161
162pub fn zfilename2zip2blobs2jsons2writer<P, W>(
163    zfilename: P,
164    buf: &mut Vec<u8>,
165    options: &Options,
166    wtr: &mut BufWriter<W>,
167) -> Result<(), io::Error>
168where
169    W: Write,
170    P: AsRef<Path> + Clone,
171{
172    let zfn_for_err = zfilename.as_ref().to_string_lossy().to_string();
173    match filename2buf(zfilename.as_ref(), buf, options.max_zip_size) {
174        Ok(_) => {
175            // Processing continues below
176        }
177        Err(e) => {
178            if options.verbose {
179                match e {
180                    ReadError::SizeLimitExceeded => {
181                        eprintln!(
182                            "level:warn\tstatus:zip_skipped\treason:size_limit_exceeded\tpath:{}",
183                            zfn_for_err
184                        );
185                    }
186                    ReadError::Io(io_err) => {
187                        eprintln!(
188                            "level:warn\tstatus:zip_skipped\treason:read_error\tpath:{}\terror:{}",
189                            zfn_for_err, io_err
190                        );
191                    }
192                }
193            }
194            return Ok(()); // Skip to the next file
195        }
196    };
197
198    let zip_name = zfilename.as_ref().to_string_lossy().to_string();
199
200    if let Err(e) = buf2zip2blobs2jsons2writer(
201        &zip_name,
202        buf,
203        options.content_type,
204        options.content_encoding,
205        options.max_item_size,
206        options.verbose,
207        wtr,
208    ) && options.verbose
209    {
210        eprintln!(
211            "level:warn\tstatus:zip_processing_failed\tpath:{}\treason:{}",
212            zfn_for_err, e
213        );
214    }
215    Ok(())
216}
217
218pub fn zfilenames2zip2blobs2jsons2writer<I, W>(
219    zfilenames: I,
220    buf: &mut Vec<u8>,
221    options: &Options,
222    wtr: &mut BufWriter<W>,
223) -> Result<(), io::Error>
224where
225    W: Write,
226    I: Iterator<Item = Result<String, io::Error>>,
227{
228    for zfilename_res in zfilenames {
229        match zfilename_res {
230            Ok(zfilename) => {
231                if let Err(e) = zfilename2zip2blobs2jsons2writer(&zfilename, buf, options, wtr)
232                    && options.verbose
233                {
234                    eprintln!(
235                        "level:warn\tstatus:unrecoverable_error\tpath:{}\treason:{}",
236                        zfilename, e
237                    );
238                }
239            }
240            Err(e) => {
241                if options.verbose {
242                    eprintln!("level:warn\tstatus:unrecoverable_error\treason:{}", e);
243                }
244            }
245        }
246    }
247    Ok(())
248}
249
250pub fn stdin2zfilenames2zip2blobs2jsons2stdout(
251    max_zip_size: u64,
252    content_type: &str,
253    content_encoding: &str,
254    max_item_size: u64,
255    verbose: bool,
256) -> Result<(), io::Error> {
257    let stdout = io::stdout();
258    let mut writer = BufWriter::new(stdout.lock());
259    let mut buf: Vec<u8> = Vec::with_capacity((1 << 20) * 2);
260    let options = Options {
261        max_zip_size,
262        content_type,
263        content_encoding,
264        max_item_size,
265        verbose,
266    };
267
268    zfilenames2zip2blobs2jsons2writer(stdin2filenames(), &mut buf, &options, &mut writer)?;
269
270    writer.flush()
271}