aws_multipart_upload/codec/
csv_writer.rs

1use crate::AWS_MIN_PART_SIZE;
2use crate::client::part::PartBody;
3use crate::codec::{EncodeError, EncodeErrorKind, PartEncoder};
4
5use bytesize::ByteSize;
6use csv::{Error as CsvError, Writer, WriterBuilder};
7use serde::Serialize;
8
9/// `CsvEncoder` implements `PartEncoder` by writing items to the part in CSV
10/// format.
11#[derive(Debug)]
12pub struct CsvEncoder {
13    writer: Writer<PartBody>,
14    write_header: bool,
15    capacity: u64,
16}
17
18impl CsvEncoder {
19    /// Write a header row from the item as the first line in the upload.
20    pub fn with_header(self) -> Self {
21        Self {
22            write_header: true,
23            ..self
24        }
25    }
26
27    /// Initial capacity allocated for the CSV writer.
28    pub fn with_capacity(self, capacity: ByteSize) -> Self {
29        Self {
30            capacity: capacity.as_u64(),
31            ..self
32        }
33    }
34}
35
36impl Default for CsvEncoder {
37    fn default() -> Self {
38        let capacity = AWS_MIN_PART_SIZE.as_u64();
39        let part = PartBody::with_capacity(capacity as usize);
40        let mut builder = WriterBuilder::new();
41        let writer = builder
42            .buffer_capacity(capacity as usize)
43            .has_headers(false)
44            .from_writer(part);
45
46        Self {
47            writer,
48            write_header: false,
49            capacity,
50        }
51    }
52}
53
54impl<Item: Serialize> PartEncoder<Item> for CsvEncoder {
55    type Error = CsvError;
56
57    fn restore(&self) -> Result<Self, Self::Error> {
58        let cap = self.writer.get_ref().capacity();
59        let part = PartBody::with_capacity(cap);
60        let mut builder = WriterBuilder::new();
61        let writer = if self.write_header {
62            builder
63                .buffer_capacity(self.capacity as usize)
64                .from_writer(part)
65        } else {
66            builder
67                .buffer_capacity(self.capacity as usize)
68                .has_headers(false)
69                .from_writer(part)
70        };
71
72        Ok(Self {
73            writer,
74            write_header: self.write_header,
75            capacity: self.capacity,
76        })
77    }
78
79    fn encode(&mut self, item: Item) -> Result<usize, Self::Error> {
80        let before = self.writer.get_ref().size();
81        self.writer.serialize(item)?;
82        self.writer.flush()?;
83        let after = self.writer.get_ref().size();
84        Ok(after - before)
85    }
86
87    fn flush(&mut self) -> Result<(), Self::Error> {
88        self.writer.flush()?;
89        Ok(())
90    }
91
92    fn into_body(self) -> Result<PartBody, Self::Error> {
93        match self.writer.into_inner() {
94            Ok(body) => Ok(body),
95            Err(e) => Err(e.into_error())?,
96        }
97    }
98
99    fn clear(&self) -> Result<Self, Self::Error> {
100        let cap = self.writer.get_ref().capacity();
101        let part = PartBody::with_capacity(cap);
102        let mut builder = WriterBuilder::new();
103        let writer = builder
104            .buffer_capacity(self.capacity as usize)
105            .has_headers(false)
106            .from_writer(part);
107
108        Ok(Self {
109            writer,
110            write_header: self.write_header,
111            capacity: self.capacity,
112        })
113    }
114}
115
116impl EncodeError for CsvError {
117    fn message(&self) -> String {
118        self.to_string()
119    }
120
121    fn kind(&self) -> EncodeErrorKind {
122        match self.kind() {
123            csv::ErrorKind::Io(_) => EncodeErrorKind::Io,
124            csv::ErrorKind::UnequalLengths { .. }
125            | csv::ErrorKind::Utf8 { .. }
126            | csv::ErrorKind::Deserialize { .. }
127            | csv::ErrorKind::Serialize(_) => EncodeErrorKind::Data,
128            csv::ErrorKind::Seek => EncodeErrorKind::Eof,
129            _ => EncodeErrorKind::Unknown,
130        }
131    }
132}