s2_api/v1/stream/proto/
mod.rs1use compact_str::ToCompactString;
2use s2_common::{record, types};
3
4include!("s2.v1.rs");
5
6impl From<record::StreamPosition> for StreamPosition {
7 fn from(record::StreamPosition { seq_num, timestamp }: record::StreamPosition) -> Self {
8 Self { seq_num, timestamp }
9 }
10}
11
12impl From<Header> for record::Header {
13 fn from(Header { name, value }: Header) -> Self {
14 Self { name, value }
15 }
16}
17
18impl From<record::Header> for Header {
19 fn from(record::Header { name, value }: record::Header) -> Self {
20 Self { name, value }
21 }
22}
23
24impl TryFrom<AppendRecord> for types::stream::AppendRecord {
25 type Error = types::ValidationError;
26
27 fn try_from(
28 AppendRecord {
29 timestamp,
30 headers,
31 body,
32 }: AppendRecord,
33 ) -> Result<Self, Self::Error> {
34 Ok(Self::try_from(types::stream::AppendRecordParts {
35 timestamp,
36 record: record::Record::try_from_parts(
37 headers.into_iter().map(Into::into).collect(),
38 body,
39 )
40 .map_err(|e| e.to_string())?
41 .into(),
42 })?)
43 }
44}
45
46impl TryFrom<AppendInput> for types::stream::AppendInput {
47 type Error = types::ValidationError;
48
49 fn try_from(
50 AppendInput {
51 records,
52 match_seq_num,
53 fencing_token,
54 }: AppendInput,
55 ) -> Result<Self, Self::Error> {
56 let records = records
57 .into_iter()
58 .map(types::stream::AppendRecord::try_from)
59 .collect::<Result<Vec<_>, _>>()?;
60
61 Ok(Self {
62 records: types::stream::AppendRecordBatch::try_from(records)?,
63 match_seq_num,
64 fencing_token: fencing_token
65 .as_deref()
66 .map(|s| s.to_compact_string().try_into())
67 .transpose()?,
68 })
69 }
70}
71
72impl From<types::stream::AppendAck> for AppendAck {
73 fn from(types::stream::AppendAck { start, end, tail }: types::stream::AppendAck) -> Self {
74 Self {
75 start: Some(start.into()),
76 end: Some(end.into()),
77 tail: Some(tail.into()),
78 }
79 }
80}
81
82impl From<record::SequencedRecord> for SequencedRecord {
83 fn from(
84 record::SequencedRecord {
85 position: record::StreamPosition { seq_num, timestamp },
86 record,
87 }: record::SequencedRecord,
88 ) -> Self {
89 let (headers, body) = record.into_parts();
90 Self {
91 seq_num,
92 timestamp,
93 headers: headers.into_iter().map(Into::into).collect(),
94 body,
95 }
96 }
97}
98
99impl From<types::stream::ReadBatch> for ReadBatch {
100 fn from(batch: types::stream::ReadBatch) -> Self {
101 Self {
102 records: batch.records.into_iter().map(Into::into).collect(),
103 tail: batch.tail.map(Into::into),
104 }
105 }
106}