proxy_sdk/
stream.rs

1use std::ops::RangeBounds;
2
3use crate::{
4    calculate_range,
5    context::BaseContext,
6    hostcalls::{self, BufferType},
7    log_concern,
8    property::envoy::Attributes,
9};
10
11/// Defines control functions for streams
12pub trait StreamControl {
13    /// Retrieve attributes for the stream data
14    fn attributes(&self) -> &Attributes;
15
16    /// TODO: UNKNOWN PURPOSE
17    fn resume_downstream(&self) {
18        log_concern("resume-downstream", hostcalls::resume_downstream());
19    }
20
21    /// TODO: UNKNOWN PURPOSE
22    fn close_downstream(&self) {
23        log_concern("close-downstream", hostcalls::close_downstream());
24    }
25
26    /// TODO: UNKNOWN PURPOSE
27    fn resume_upstream(&self) {
28        log_concern("resume-upstream", hostcalls::resume_upstream());
29    }
30
31    /// TODO: UNKNOWN PURPOSE
32    fn close_upstream(&self) {
33        log_concern("close-upstream", hostcalls::close_upstream());
34    }
35}
36
37/// Defines functions to interact with stream data
38pub trait StreamDataControl {
39    /// Upstream or Downstream
40    const TYPE: StreamType;
41
42    /// Length of this chunk of data
43    fn data_size(&self) -> usize;
44
45    /// If true, this will be the last downstream data for this context.
46    fn end_of_stream(&self) -> bool;
47
48    /// Get all data
49    fn all(&self) -> Option<Vec<u8>> {
50        self.get(..)
51    }
52
53    /// Get a range of data
54    fn get(&self, range: impl RangeBounds<usize>) -> Option<Vec<u8>> {
55        let (start, size) = calculate_range(range, self.data_size());
56        log_concern(
57            Self::TYPE.get(),
58            hostcalls::get_buffer(Self::TYPE.buffer(), start, size),
59        )
60    }
61
62    /// Replace a range of data with `value`.
63    fn set(&self, range: impl RangeBounds<usize>, value: &[u8]) {
64        let (start, size) = calculate_range(range, self.data_size());
65        log_concern(
66            Self::TYPE.set(),
67            hostcalls::set_buffer(Self::TYPE.buffer(), start, size, value),
68        );
69    }
70
71    /// Replace the entire data with `value`
72    fn replace(&self, value: &[u8]) {
73        self.set(.., value);
74    }
75
76    /// Clear the data
77    fn clear(&self) {
78        self.replace(&[]);
79    }
80
81    /// Writes data directly upstream, should be called from downstream context.
82    #[cfg(not(target_arch = "wasm32"))]
83    fn write_upstream(&self, data: &[u8]) {
84        log_concern("write_upstream", hostcalls::write_upstream(data));
85    }
86
87    /// Writes data directly downstream, should be called from upstream context.
88    #[cfg(not(target_arch = "wasm32"))]
89    fn write_downstream(&self, data: &[u8]) {
90        log_concern("write_downstream", hostcalls::write_downstream(data));
91    }
92}
93
94#[repr(usize)]
95#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
96#[non_exhaustive]
97pub enum FilterStreamStatus {
98    Continue = 0,
99    StopIteration = 1,
100}
101
102#[derive(Debug)]
103pub enum StreamType {
104    Upstream,
105    Downstream,
106}
107
108impl StreamType {
109    const fn get(&self) -> &'static str {
110        match self {
111            Self::Upstream => "get-upstream-data",
112            Self::Downstream => "get-downstream-data",
113        }
114    }
115
116    const fn set(&self) -> &'static str {
117        match self {
118            Self::Upstream => "set-upstream-data",
119            Self::Downstream => "set-downstream-data",
120        }
121    }
122
123    const fn buffer(&self) -> BufferType {
124        match self {
125            Self::Upstream => BufferType::UpstreamData,
126            Self::Downstream => BufferType::DownstreamData,
127        }
128    }
129}
130
131/// Upstream data reference for a Stream filter
132pub struct UpstreamData {
133    pub(crate) data_size: usize,
134    pub(crate) end_of_stream: bool,
135    pub(crate) attributes: Attributes,
136}
137
138impl StreamControl for UpstreamData {
139    fn attributes(&self) -> &Attributes {
140        &self.attributes
141    }
142}
143
144impl StreamDataControl for UpstreamData {
145    const TYPE: StreamType = StreamType::Upstream;
146
147    fn data_size(&self) -> usize {
148        self.data_size
149    }
150
151    fn end_of_stream(&self) -> bool {
152        self.end_of_stream
153    }
154}
155
156/// Downstream data reference for a Stream filter
157pub struct DownstreamData {
158    pub(crate) data_size: usize,
159    pub(crate) end_of_stream: bool,
160    pub(crate) attributes: Attributes,
161}
162
163impl StreamControl for DownstreamData {
164    fn attributes(&self) -> &Attributes {
165        &self.attributes
166    }
167}
168
169impl StreamDataControl for DownstreamData {
170    const TYPE: StreamType = StreamType::Downstream;
171
172    fn data_size(&self) -> usize {
173        self.data_size
174    }
175
176    fn end_of_stream(&self) -> bool {
177        self.end_of_stream
178    }
179}
180
181#[repr(usize)]
182#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
183#[non_exhaustive]
184pub enum CloseType {
185    Unknown = 0,
186    /// Close initiated by the proxy
187    Local = 1,
188    /// Close initiated by the peer
189    Remote = 2,
190}
191
192/// Context for a stream closing event
193pub struct StreamClose {
194    pub(crate) close_type: CloseType,
195    pub(crate) attributes: Attributes,
196}
197
198impl StreamClose {
199    /// Get close type of closed peer
200    pub fn close_type(&self) -> CloseType {
201        self.close_type
202    }
203}
204
205impl StreamControl for StreamClose {
206    fn attributes(&self) -> &Attributes {
207        &self.attributes
208    }
209}
210
211/// Trait to implement stream filters (L4 filters).
212#[allow(unused_variables)]
213pub trait StreamContext: BaseContext {
214    /// Called on a new connection.
215    /// TODO: FilterStreamStatus effect unknown.
216    fn on_new_connection(&mut self) -> FilterStreamStatus {
217        FilterStreamStatus::Continue
218    }
219
220    /// Called when a chunk of downstream data is available.
221    /// `FilterStreamStatus::Pause` will delay flushing of data until `FilterStreamStatus::Continue` is returned.
222    /// TODO: `resume_downstream` might be able to trigger this from another context?
223    fn on_downstream_data(&mut self, data: &DownstreamData) -> FilterStreamStatus {
224        FilterStreamStatus::Continue
225    }
226
227    /// Called when a downstream connection closes.
228    fn on_downstream_close(&mut self, data: &StreamClose) {}
229
230    /// Called when a chunk of upstream data is available.
231    /// `FilterStreamStatus::Pause` will delay flushing of data until `FilterStreamStatus::Continue` is returned.
232    /// TODO: `resume_downstream` might be able to trigger this from another context?
233    fn on_upstream_data(&mut self, data: &UpstreamData) -> FilterStreamStatus {
234        FilterStreamStatus::Continue
235    }
236
237    /// Called when an upstream connection closes.
238    fn on_upstream_close(&mut self, data: &StreamClose) {}
239}