aws_multipart_upload/codec/
csv_writer.rs1use crate::AWS_MIN_PART_SIZE;
2use crate::client::part::PartBody;
3use crate::codec::{EncodeError, EncodeErrorKind, PartEncoder};
4
5use bytesize::ByteSize;
6use csv::{Error as CsvError, Writer, WriterBuilder};
7use serde::Serialize;
8
9#[derive(Debug)]
12pub struct CsvEncoder {
13 writer: Writer<PartBody>,
14 write_header: bool,
15 capacity: u64,
16}
17
18impl CsvEncoder {
19 pub fn with_header(self) -> Self {
21 Self {
22 write_header: true,
23 ..self
24 }
25 }
26
27 pub fn with_capacity(self, capacity: ByteSize) -> Self {
29 Self {
30 capacity: capacity.as_u64(),
31 ..self
32 }
33 }
34}
35
36impl Default for CsvEncoder {
37 fn default() -> Self {
38 let capacity = AWS_MIN_PART_SIZE.as_u64();
39 let part = PartBody::with_capacity(capacity as usize);
40 let mut builder = WriterBuilder::new();
41 let writer = builder
42 .buffer_capacity(capacity as usize)
43 .has_headers(false)
44 .from_writer(part);
45
46 Self {
47 writer,
48 write_header: false,
49 capacity,
50 }
51 }
52}
53
54impl<Item: Serialize> PartEncoder<Item> for CsvEncoder {
55 type Error = CsvError;
56
57 fn restore(&self) -> Result<Self, Self::Error> {
58 let cap = self.writer.get_ref().capacity();
59 let part = PartBody::with_capacity(cap);
60 let mut builder = WriterBuilder::new();
61 let writer = if self.write_header {
62 builder
63 .buffer_capacity(self.capacity as usize)
64 .from_writer(part)
65 } else {
66 builder
67 .buffer_capacity(self.capacity as usize)
68 .has_headers(false)
69 .from_writer(part)
70 };
71
72 Ok(Self {
73 writer,
74 write_header: self.write_header,
75 capacity: self.capacity,
76 })
77 }
78
79 fn encode(&mut self, item: Item) -> Result<usize, Self::Error> {
80 let before = self.writer.get_ref().size();
81 self.writer.serialize(item)?;
82 self.writer.flush()?;
83 let after = self.writer.get_ref().size();
84 Ok(after - before)
85 }
86
87 fn flush(&mut self) -> Result<(), Self::Error> {
88 self.writer.flush()?;
89 Ok(())
90 }
91
92 fn into_body(self) -> Result<PartBody, Self::Error> {
93 match self.writer.into_inner() {
94 Ok(body) => Ok(body),
95 Err(e) => Err(e.into_error())?,
96 }
97 }
98
99 fn clear(&self) -> Result<Self, Self::Error> {
100 let cap = self.writer.get_ref().capacity();
101 let part = PartBody::with_capacity(cap);
102 let mut builder = WriterBuilder::new();
103 let writer = builder
104 .buffer_capacity(self.capacity as usize)
105 .has_headers(false)
106 .from_writer(part);
107
108 Ok(Self {
109 writer,
110 write_header: self.write_header,
111 capacity: self.capacity,
112 })
113 }
114}
115
116impl EncodeError for CsvError {
117 fn message(&self) -> String {
118 self.to_string()
119 }
120
121 fn kind(&self) -> EncodeErrorKind {
122 match self.kind() {
123 csv::ErrorKind::Io(_) => EncodeErrorKind::Io,
124 csv::ErrorKind::UnequalLengths { .. }
125 | csv::ErrorKind::Utf8 { .. }
126 | csv::ErrorKind::Deserialize { .. }
127 | csv::ErrorKind::Serialize(_) => EncodeErrorKind::Data,
128 csv::ErrorKind::Seek => EncodeErrorKind::Eof,
129 _ => EncodeErrorKind::Unknown,
130 }
131 }
132}