s2_api/v1/stream/proto/
mod.rs

1use compact_str::ToCompactString;
2use s2_common::{record, types};
3
4include!("s2.v1.rs");
5
6impl From<record::StreamPosition> for StreamPosition {
7    fn from(record::StreamPosition { seq_num, timestamp }: record::StreamPosition) -> Self {
8        Self { seq_num, timestamp }
9    }
10}
11
12impl From<Header> for record::Header {
13    fn from(Header { name, value }: Header) -> Self {
14        Self { name, value }
15    }
16}
17
18impl From<record::Header> for Header {
19    fn from(record::Header { name, value }: record::Header) -> Self {
20        Self { name, value }
21    }
22}
23
24impl TryFrom<AppendRecord> for types::stream::AppendRecord {
25    type Error = types::ValidationError;
26
27    fn try_from(
28        AppendRecord {
29            timestamp,
30            headers,
31            body,
32        }: AppendRecord,
33    ) -> Result<Self, Self::Error> {
34        Ok(Self::try_from(types::stream::AppendRecordParts {
35            timestamp,
36            record: record::Record::try_from_parts(
37                headers.into_iter().map(Into::into).collect(),
38                body,
39            )
40            .map_err(|e| e.to_string())?
41            .into(),
42        })?)
43    }
44}
45
46impl TryFrom<AppendInput> for types::stream::AppendInput {
47    type Error = types::ValidationError;
48
49    fn try_from(
50        AppendInput {
51            records,
52            match_seq_num,
53            fencing_token,
54        }: AppendInput,
55    ) -> Result<Self, Self::Error> {
56        let records = records
57            .into_iter()
58            .map(types::stream::AppendRecord::try_from)
59            .collect::<Result<Vec<_>, _>>()?;
60
61        Ok(Self {
62            records: types::stream::AppendRecordBatch::try_from(records)?,
63            match_seq_num,
64            fencing_token: fencing_token
65                .as_deref()
66                .map(|s| s.to_compact_string().try_into())
67                .transpose()?,
68        })
69    }
70}
71
72impl From<types::stream::AppendAck> for AppendAck {
73    fn from(types::stream::AppendAck { start, end, tail }: types::stream::AppendAck) -> Self {
74        Self {
75            start: Some(start.into()),
76            end: Some(end.into()),
77            tail: Some(tail.into()),
78        }
79    }
80}
81
82impl From<record::SequencedRecord> for SequencedRecord {
83    fn from(
84        record::SequencedRecord {
85            position: record::StreamPosition { seq_num, timestamp },
86            record,
87        }: record::SequencedRecord,
88    ) -> Self {
89        let (headers, body) = record.into_parts();
90        Self {
91            seq_num,
92            timestamp,
93            headers: headers.into_iter().map(Into::into).collect(),
94            body,
95        }
96    }
97}
98
99impl From<types::stream::ReadBatch> for ReadBatch {
100    fn from(batch: types::stream::ReadBatch) -> Self {
101        Self {
102            records: batch.records.into_iter().map(Into::into).collect(),
103            tail: batch.tail.map(Into::into),
104        }
105    }
106}