1use std::time::Duration;
2use thiserror::Error;
3
4use crate::{
5 consumer::new_consumer, end_producer, format::Header, new_producer, AsyncFile, FileConsumer,
6 FileErr, FileId, FileProducer, FileResult, DEFAULT_BEACON_INTERVAL, DEFAULT_FILE_SIZE_LIMIT,
7 DEFAULT_PREFETCH_MESSAGE, SEA_STREAMER_WILDCARD,
8};
9use sea_streamer_types::{
10 ConnectOptions as ConnectOptionsTrait, ConsumerGroup, ConsumerMode,
11 ConsumerOptions as ConsumerOptionsTrait, ProducerOptions as ProducerOptionsTrait, StreamErr,
12 StreamKey, StreamUrlErr, Streamer as StreamerTrait, StreamerUri,
13};
14
15#[derive(Debug, Clone)]
16pub struct FileStreamer {
17 file_id: FileId,
18 options: FileConnectOptions,
19}
20
21#[derive(Debug, Clone)]
22pub struct FileConnectOptions {
23 create_file: CreateFileOption,
24 end_with_eos: bool,
25 beacon_interval: u32,
26 file_size_limit: u64,
27 prefetch_message: usize,
28}
29
30#[derive(Debug, Clone)]
31enum CreateFileOption {
32 Never,
34 CreateIfNotExists,
35 Always,
37}
38
39#[derive(Debug, Clone)]
40pub struct FileConsumerOptions {
41 mode: ConsumerMode,
42 group: Option<ConsumerGroup>,
43 auto_stream_reset: AutoStreamReset,
44 live_streaming: bool,
45}
46
47#[derive(Debug, Default, Clone)]
48pub struct FileProducerOptions {}
49
50#[derive(Debug, Copy, Clone, PartialEq, Eq)]
51pub enum AutoStreamReset {
53 Earliest,
54 Latest,
55}
56
57#[derive(Debug, Copy, Clone, PartialEq, Eq)]
58pub enum StreamMode {
59 Live,
61 Replay,
63 LiveReplay,
65}
66
67#[derive(Error, Debug, Clone, Copy)]
68pub enum ConfigErr {
69 #[error("Cannot stream from a non-live file at the end")]
70 LatestButNotLive,
71 #[error("Consumers in the same ConsumerGroup must use the same ConsumerMode")]
72 SameGroupSameMode,
73 #[error("Please choose a 'better aligned' beacon interval")]
74 InvalidBeaconInterval,
75 #[error("Wildcard stream can only be subscribed in ungrouped Consumer")]
76 NoWildcardInGroup,
77}
78
79impl StreamerTrait for FileStreamer {
80 type Error = FileErr;
81 type Producer = FileProducer;
82 type Consumer = FileConsumer;
83 type ConnectOptions = FileConnectOptions;
84 type ConsumerOptions = FileConsumerOptions;
85 type ProducerOptions = FileProducerOptions;
86
87 async fn connect(uri: StreamerUri, options: Self::ConnectOptions) -> FileResult<Self> {
90 if uri.nodes().is_empty() {
91 return Err(StreamErr::StreamUrlErr(StreamUrlErr::ZeroNode));
92 }
93 let path = uri
94 .nodes()
95 .first()
96 .unwrap()
97 .as_str()
98 .trim_start_matches("file://")
99 .trim_end_matches('/');
100 let file_id = FileId::new(path);
101 match options.create_file {
102 CreateFileOption::Never => AsyncFile::new_r(file_id.clone()).await,
103 CreateFileOption::CreateIfNotExists => AsyncFile::new_rw(file_id.clone()).await,
104 CreateFileOption::Always => AsyncFile::new_w(file_id.clone()).await,
105 }?;
106 Ok(Self { file_id, options })
107 }
108
109 async fn disconnect(self) -> FileResult<()> {
111 match end_producer(self.file_id).recv_async().await {
112 Ok(Ok(())) => Ok(()),
113 Ok(Err(e)) => Err(StreamErr::Backend(e)),
114 Err(_) => Err(StreamErr::Backend(FileErr::ProducerEnded)),
115 }
116 }
117
118 async fn create_generic_producer(
119 &self,
120 options: Self::ProducerOptions,
121 ) -> FileResult<Self::Producer> {
122 new_producer(self.file_id.clone(), &self.options, &options)
123 .await
124 .map_err(StreamErr::Backend)
125 }
126
127 async fn create_consumer(
130 &self,
131 streams: &[StreamKey],
132 options: Self::ConsumerOptions,
133 ) -> FileResult<Self::Consumer> {
134 match options.mode {
135 ConsumerMode::RealTime => {
136 if options.group.is_some() {
137 return Err(StreamErr::ConsumerGroupIsSet);
138 }
139 }
140 ConsumerMode::Resumable => {
141 return Err(StreamErr::Unsupported(
142 "File does not support Resumable yet".to_owned(),
143 ))
144 }
145 ConsumerMode::LoadBalanced => {
146 if options.group.is_none() {
147 return Err(StreamErr::ConsumerGroupNotSet);
148 }
149 }
150 }
151 let stream_mode = match (options.auto_stream_reset, options.live_streaming) {
152 (AutoStreamReset::Latest, true) => StreamMode::Live,
153 (AutoStreamReset::Earliest, true) => {
154 if options.group.is_none() {
155 let file = AsyncFile::new_r(self.file_id.clone()).await?;
156 if file.size() <= Header::size() as u64 {
157 StreamMode::Live
159 } else {
160 StreamMode::LiveReplay
161 }
162 } else {
163 StreamMode::LiveReplay
164 }
165 }
166 (AutoStreamReset::Earliest, false) => StreamMode::Replay,
167 (AutoStreamReset::Latest, false) => {
168 return Err(StreamErr::Backend(FileErr::ConfigErr(
169 ConfigErr::LatestButNotLive,
170 )))
171 }
172 };
173
174 if options.group.is_some() && streams.iter().any(|s| s.name() == SEA_STREAMER_WILDCARD) {
175 return Err(StreamErr::Backend(FileErr::ConfigErr(
176 ConfigErr::NoWildcardInGroup,
177 )));
178 }
179
180 let consumer = new_consumer(
181 self.file_id.clone(),
182 stream_mode,
183 options.group,
184 streams.to_vec(),
185 self.options.prefetch_message,
186 )
187 .await?;
188 Ok(consumer)
189 }
190}
191
192impl ConnectOptionsTrait for FileConnectOptions {
193 type Error = FileErr;
194
195 fn timeout(&self) -> FileResult<Duration> {
196 Err(StreamErr::TimeoutNotSet)
197 }
198
199 fn set_timeout(&mut self, _: Duration) -> FileResult<&mut Self> {
201 Ok(self)
202 }
203}
204
205impl Default for FileConnectOptions {
206 fn default() -> Self {
207 Self {
208 create_file: CreateFileOption::Never,
209 end_with_eos: false,
210 beacon_interval: DEFAULT_BEACON_INTERVAL,
211 file_size_limit: DEFAULT_FILE_SIZE_LIMIT,
212 prefetch_message: DEFAULT_PREFETCH_MESSAGE,
213 }
214 }
215}
216
217impl FileConnectOptions {
218 pub fn create_if_not_exists(&self) -> bool {
219 matches!(self.create_file, CreateFileOption::CreateIfNotExists)
220 }
221 pub fn set_create_if_not_exists(&mut self, v: bool) -> &mut Self {
223 if v {
224 self.create_file = CreateFileOption::CreateIfNotExists;
225 } else {
226 self.create_file = CreateFileOption::Never;
227 }
228 self
229 }
230
231 pub fn create_only(&self) -> bool {
232 matches!(self.create_file, CreateFileOption::Always)
233 }
234 pub fn set_create_only(&mut self, v: bool) -> &mut Self {
236 if v {
237 self.create_file = CreateFileOption::Always;
238 } else {
239 self.create_file = CreateFileOption::Never;
240 }
241 self
242 }
243
244 pub fn end_with_eos(&self) -> bool {
245 self.end_with_eos
246 }
247 pub fn set_end_with_eos(&mut self, v: bool) -> &mut Self {
252 self.end_with_eos = v;
253 self
254 }
255
256 pub fn beacon_interval(&self) -> u32 {
257 self.beacon_interval
258 }
259 pub fn set_beacon_interval(&mut self, v: u32) -> Result<&mut Self, FileErr> {
263 let valid = v > 0 && v % 1024 == 0;
264 if !valid {
265 return Err(FileErr::ConfigErr(ConfigErr::InvalidBeaconInterval));
266 }
267 self.beacon_interval = v;
268 Ok(self)
269 }
270
271 pub fn file_size_limit(&self) -> u64 {
272 self.file_size_limit
273 }
274 pub fn set_file_size_limit(&mut self, v: u64) -> &mut Self {
276 self.file_size_limit = v;
277 self
278 }
279
280 pub fn prefetch_message(&self) -> usize {
281 self.prefetch_message
282 }
283
284 pub fn set_prefetch_message(&mut self, v: usize) -> &mut Self {
289 self.prefetch_message = v;
290 self
291 }
292}
293
294impl ConsumerOptionsTrait for FileConsumerOptions {
295 type Error = FileErr;
296
297 fn new(mode: ConsumerMode) -> Self {
298 Self {
299 mode,
300 group: None,
301 auto_stream_reset: AutoStreamReset::Latest,
302 live_streaming: true,
303 }
304 }
305
306 fn mode(&self) -> FileResult<&ConsumerMode> {
307 Ok(&self.mode)
308 }
309
310 fn consumer_group(&self) -> FileResult<&ConsumerGroup> {
311 self.group.as_ref().ok_or(StreamErr::ConsumerGroupNotSet)
312 }
313
314 fn set_consumer_group(&mut self, group: ConsumerGroup) -> FileResult<&mut Self> {
317 self.group = Some(group);
318 Ok(self)
319 }
320}
321
322impl FileConsumerOptions {
323 pub fn set_auto_stream_reset(&mut self, v: AutoStreamReset) -> &mut Self {
327 self.auto_stream_reset = v;
328 self
329 }
330 pub fn auto_stream_reset(&self) -> &AutoStreamReset {
331 &self.auto_stream_reset
332 }
333
334 pub fn set_live_streaming(&mut self, v: bool) -> &mut Self {
338 self.live_streaming = v;
339 self
340 }
341 pub fn live_streaming(&self) -> &bool {
342 &self.live_streaming
343 }
344}
345
346impl Default for FileConsumerOptions {
347 fn default() -> Self {
348 Self::new(ConsumerMode::RealTime)
349 }
350}
351
352impl ProducerOptionsTrait for FileProducerOptions {}