sea_streamer_file/
streamer.rs

1use std::time::Duration;
2use thiserror::Error;
3
4use crate::{
5    consumer::new_consumer, end_producer, format::Header, new_producer, AsyncFile, FileConsumer,
6    FileErr, FileId, FileProducer, FileResult, DEFAULT_BEACON_INTERVAL, DEFAULT_FILE_SIZE_LIMIT,
7    DEFAULT_PREFETCH_MESSAGE, SEA_STREAMER_WILDCARD,
8};
9use sea_streamer_types::{
10    ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode,
11    ConsumerOptions as ConsumerOptionsTrait, ProducerOptions as ProducerOptionsTrait, StreamErr,
12    StreamKey, StreamUrlErr, Streamer as StreamerTrait, StreamerUri,
13};
14
15#[derive(Debug, Clone)]
16pub struct FileStreamer {
17    file_id: FileId,
18    options: FileConnectOptions,
19}
20
21#[derive(Debug, Clone)]
22pub struct FileConnectOptions {
23    create_file: CreateFileOption,
24    end_with_eos: bool,
25    beacon_interval: u32,
26    file_size_limit: u64,
27    prefetch_message: usize,
28}
29
30#[derive(Debug, Clone)]
31enum CreateFileOption {
32    /// File must already exists
33    Never,
34    CreateIfNotExists,
35    /// Fail if the file already exists
36    Always,
37}
38
39#[derive(Debug, Clone)]
40pub struct FileConsumerOptions {
41    mode: ConsumerMode,
42    group: Option<ConsumerGroup>,
43    auto_stream_reset: AutoStreamReset,
44    live_streaming: bool,
45}
46
47#[derive(Debug, Default, Clone)]
48pub struct FileProducerOptions {}
49
50#[derive(Debug, Copy, Clone, PartialEq, Eq)]
51/// Where to start streaming from.
52pub enum AutoStreamReset {
53    Earliest,
54    Latest,
55}
56
57#[derive(Debug, Copy, Clone, PartialEq, Eq)]
58pub enum StreamMode {
59    /// Streaming from a file at the end
60    Live,
61    /// Replaying a dead file
62    Replay,
63    /// Replaying a live file, might catch up to live
64    LiveReplay,
65}
66
67#[derive(Error, Debug, Clone, Copy)]
68pub enum ConfigErr {
69    #[error("Cannot stream from a non-live file at the end")]
70    LatestButNotLive,
71    #[error("Consumers in the same ConsumerGroup must use the same ConsumerMode")]
72    SameGroupSameMode,
73    #[error("Please choose a 'better aligned' beacon interval")]
74    InvalidBeaconInterval,
75    #[error("Wildcard stream can only be subscribed in ungrouped Consumer")]
76    NoWildcardInGroup,
77}
78
79impl StreamerTrait for FileStreamer {
80    type Error = FileErr;
81    type Producer = FileProducer;
82    type Consumer = FileConsumer;
83    type ConnectOptions = FileConnectOptions;
84    type ConsumerOptions = FileConsumerOptions;
85    type ProducerOptions = FileProducerOptions;
86
87    /// First check whether the file exists.
88    /// If not, depending on the options, either create it, or error.
89    async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> FileResult<Self> {
90        if uri.nodes().is_empty() {
91            return Err(StreamErr::StreamUrlErr(StreamUrlErr::ZeroNode));
92        }
93        let path = uri
94            .nodes()
95            .first()
96            .unwrap()
97            .as_str()
98            .trim_start_matches("file://")
99            .trim_end_matches('/');
100        let file_id = FileId::new(path);
101        match options.create_file {
102            CreateFileOption::Never => AsyncFile::new_r(file_id.clone()).await,
103            CreateFileOption::CreateIfNotExists => AsyncFile::new_rw(file_id.clone()).await,
104            CreateFileOption::Always => AsyncFile::new_w(file_id.clone()).await,
105        }?;
106        Ok(Self { file_id, options })
107    }
108
109    /// End the producers before disconnecting.
110    async fn disconnect(self) -> FileResult<()> {
111        match end_producer(self.file_id).recv_async().await {
112            Ok(Ok(())) => Ok(()),
113            Ok(Err(e)) => Err(StreamErr::Backend(e)),
114            Err(_) => Err(StreamErr::Backend(FileErr::ProducerEnded)),
115        }
116    }
117
118    async fn create_generic_producer(
119        &self,
120        options: Self::ProducerOptions,
121    ) -> FileResult<Self::Producer> {
122        new_producer(self.file_id.clone(), &self.options, &options)
123            .await
124            .map_err(StreamErr::Backend)
125    }
126
127    /// To subscribe to all streams in a file, you can use the magic [`SEA_STREAMER_WILDCARD`] stream key.
128    /// For `RealTime` consumer only.
129    async fn create_consumer(
130        &self,
131        streams: &[StreamKey],
132        options: Self::ConsumerOptions,
133    ) -> FileResult<Self::Consumer> {
134        match options.mode {
135            ConsumerMode::RealTime => {
136                if options.group.is_some() {
137                    return Err(StreamErr::ConsumerGroupIsSet);
138                }
139            }
140            ConsumerMode::Resumable => {
141                return Err(StreamErr::Unsupported(
142                    "File does not support Resumable yet".to_owned(),
143                ))
144            }
145            ConsumerMode::LoadBalanced => {
146                if options.group.is_none() {
147                    return Err(StreamErr::ConsumerGroupNotSet);
148                }
149            }
150        }
151        let stream_mode = match (options.auto_stream_reset, options.live_streaming) {
152            (AutoStreamReset::Latest, true) => StreamMode::Live,
153            (AutoStreamReset::Earliest, true) => {
154                if options.group.is_none() {
155                    let file = AsyncFile::new_r(self.file_id.clone()).await?;
156                    if file.size() <= Header::size() as u64 {
157                        // special case when the file has no data
158                        StreamMode::Live
159                    } else {
160                        StreamMode::LiveReplay
161                    }
162                } else {
163                    StreamMode::LiveReplay
164                }
165            }
166            (AutoStreamReset::Earliest, false) => StreamMode::Replay,
167            (AutoStreamReset::Latest, false) => {
168                return Err(StreamErr::Backend(FileErr::ConfigErr(
169                    ConfigErr::LatestButNotLive,
170                )))
171            }
172        };
173
174        if options.group.is_some() && streams.iter().any(|s| s.name() == SEA_STREAMER_WILDCARD) {
175            return Err(StreamErr::Backend(FileErr::ConfigErr(
176                ConfigErr::NoWildcardInGroup,
177            )));
178        }
179
180        let consumer = new_consumer(
181            self.file_id.clone(),
182            stream_mode,
183            options.group,
184            streams.to_vec(),
185            self.options.prefetch_message,
186        )
187        .await?;
188        Ok(consumer)
189    }
190}
191
192impl ConnectOptionsTrait for FileConnectOptions {
193    type Error = FileErr;
194
195    fn timeout(&self) -> FileResult<Duration> {
196        Err(StreamErr::TimeoutNotSet)
197    }
198
199    /// This parameter is ignored.
200    fn set_timeout(&mut self, _: Duration) -> FileResult<&mut Self> {
201        Ok(self)
202    }
203}
204
205impl Default for FileConnectOptions {
206    fn default() -> Self {
207        Self {
208            create_file: CreateFileOption::Never,
209            end_with_eos: false,
210            beacon_interval: DEFAULT_BEACON_INTERVAL,
211            file_size_limit: DEFAULT_FILE_SIZE_LIMIT,
212            prefetch_message: DEFAULT_PREFETCH_MESSAGE,
213        }
214    }
215}
216
217impl FileConnectOptions {
218    pub fn create_if_not_exists(&self) -> bool {
219        matches!(self.create_file, CreateFileOption::CreateIfNotExists)
220    }
221    /// Default is `false`.
222    pub fn set_create_if_not_exists(&mut self, v: bool) -> &mut Self {
223        if v {
224            self.create_file = CreateFileOption::CreateIfNotExists;
225        } else {
226            self.create_file = CreateFileOption::Never;
227        }
228        self
229    }
230
231    pub fn create_only(&self) -> bool {
232        matches!(self.create_file, CreateFileOption::Always)
233    }
234    /// Always create the file. Fail if already exists. Default is `false`.
235    pub fn set_create_only(&mut self, v: bool) -> &mut Self {
236        if v {
237            self.create_file = CreateFileOption::Always;
238        } else {
239            self.create_file = CreateFileOption::Never;
240        }
241        self
242    }
243
244    pub fn end_with_eos(&self) -> bool {
245        self.end_with_eos
246    }
247    /// If true, when the producer ends, a End-of-Stream message will be written.
248    /// This signals Consumers to end their streams.
249    ///
250    /// Default is `false`.
251    pub fn set_end_with_eos(&mut self, v: bool) -> &mut Self {
252        self.end_with_eos = v;
253        self
254    }
255
256    pub fn beacon_interval(&self) -> u32 {
257        self.beacon_interval
258    }
259    /// Beacon interval. Should be multiples of 1024 (1KB).
260    ///
261    /// Default is [`crate::DEFAULT_BEACON_INTERVAL`].
262    pub fn set_beacon_interval(&mut self, v: u32) -> Result<&mut Self, FileErr> {
263        let valid = v > 0 && v % 1024 == 0;
264        if !valid {
265            return Err(FileErr::ConfigErr(ConfigErr::InvalidBeaconInterval));
266        }
267        self.beacon_interval = v;
268        Ok(self)
269    }
270
271    pub fn file_size_limit(&self) -> u64 {
272        self.file_size_limit
273    }
274    /// Default is [`crate::DEFAULT_FILE_SIZE_LIMIT`].
275    pub fn set_file_size_limit(&mut self, v: u64) -> &mut Self {
276        self.file_size_limit = v;
277        self
278    }
279
280    pub fn prefetch_message(&self) -> usize {
281        self.prefetch_message
282    }
283
284    /// Number of messages to prefetch. A larger number would lead to higher memory usage.
285    /// Choose the number by considering the typical size of messages.
286    ///
287    /// Default is [`crate::DEFAULT_PREFETCH_MESSAGE`].
288    pub fn set_prefetch_message(&mut self, v: usize) -> &mut Self {
289        self.prefetch_message = v;
290        self
291    }
292}
293
294impl ConsumerOptionsTrait for FileConsumerOptions {
295    type Error = FileErr;
296
297    fn new(mode: ConsumerMode) -> Self {
298        Self {
299            mode,
300            group: None,
301            auto_stream_reset: AutoStreamReset::Latest,
302            live_streaming: true,
303        }
304    }
305
306    fn mode(&self) -> FileResult<&ConsumerMode> {
307        Ok(&self.mode)
308    }
309
310    fn consumer_group(&self) -> FileResult<&ConsumerGroup> {
311        self.group.as_ref().ok_or(StreamErr::ConsumerGroupNotSet)
312    }
313
314    /// If multiple consumers share the same group, only one in the group will receive a message.
315    /// This is load-balanced in a round-robin fashion.
316    fn set_consumer_group(&mut self, group: ConsumerGroup) -> FileResult<&mut Self> {
317        self.group = Some(group);
318        Ok(self)
319    }
320}
321
322impl FileConsumerOptions {
323    /// Where to stream from the file.
324    ///
325    /// If unset, defaults to `Latest`.
326    pub fn set_auto_stream_reset(&mut self, v: AutoStreamReset) -> &mut Self {
327        self.auto_stream_reset = v;
328        self
329    }
330    pub fn auto_stream_reset(&self) -> &AutoStreamReset {
331        &self.auto_stream_reset
332    }
333
334    /// If true, follow the file like `tail -f` and read new messages as there is more data.
335    ///
336    /// If unset, defaults to `true`.
337    pub fn set_live_streaming(&mut self, v: bool) -> &mut Self {
338        self.live_streaming = v;
339        self
340    }
341    pub fn live_streaming(&self) -> &bool {
342        &self.live_streaming
343    }
344}
345
346impl Default for FileConsumerOptions {
347    fn default() -> Self {
348        Self::new(ConsumerMode::RealTime)
349    }
350}
351
352impl ProducerOptionsTrait for FileProducerOptions {}