1use crate::error::{IoError, IoResult};
7use async_trait::async_trait;
8use scirs2_core::ndarray::Array1;
9
10#[async_trait]
12pub trait StreamTransform: Send + Sync {
13 async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>>;
15
16 async fn finalize(&mut self) -> IoResult<()> {
18 Ok(())
19 }
20
21 fn name(&self) -> &str {
23 "StreamTransform"
24 }
25}
26
27pub struct Pipeline {
29 transforms: Vec<Box<dyn StreamTransform>>,
30}
31
32impl Pipeline {
33 pub fn new() -> Self {
35 Self {
36 transforms: Vec::new(),
37 }
38 }
39
40 pub fn add_transform<T: StreamTransform + 'static>(mut self, transform: T) -> Self {
42 self.transforms.push(Box::new(transform));
43 self
44 }
45
46 pub async fn process(&mut self, mut data: Array1<f32>) -> IoResult<Array1<f32>> {
48 for transform in &mut self.transforms {
49 data = transform.transform(data).await?;
50 }
51 Ok(data)
52 }
53
54 pub async fn process_batch(&mut self, batch: Vec<Array1<f32>>) -> IoResult<Vec<Array1<f32>>> {
56 let mut results = Vec::with_capacity(batch.len());
57 for data in batch {
58 results.push(self.process(data).await?);
59 }
60 Ok(results)
61 }
62
63 pub async fn finalize(&mut self) -> IoResult<()> {
65 for transform in &mut self.transforms {
66 transform.finalize().await?;
67 }
68 Ok(())
69 }
70
71 pub fn len(&self) -> usize {
73 self.transforms.len()
74 }
75
76 pub fn is_empty(&self) -> bool {
78 self.transforms.is_empty()
79 }
80}
81
82impl Default for Pipeline {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88pub struct ScaleTransform {
90 scale: f32,
91}
92
93impl ScaleTransform {
94 pub fn new(scale: f32) -> Self {
95 Self { scale }
96 }
97}
98
99#[async_trait]
100impl StreamTransform for ScaleTransform {
101 async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
102 Ok(input * self.scale)
103 }
104
105 fn name(&self) -> &str {
106 "ScaleTransform"
107 }
108}
109
110pub struct OffsetTransform {
112 offset: f32,
113}
114
115impl OffsetTransform {
116 pub fn new(offset: f32) -> Self {
117 Self { offset }
118 }
119}
120
121#[async_trait]
122impl StreamTransform for OffsetTransform {
123 async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
124 Ok(input + self.offset)
125 }
126
127 fn name(&self) -> &str {
128 "OffsetTransform"
129 }
130}
131
132pub struct ClipTransform {
134 min: f32,
135 max: f32,
136}
137
138impl ClipTransform {
139 pub fn new(min: f32, max: f32) -> Self {
140 Self { min, max }
141 }
142}
143
144#[async_trait]
145impl StreamTransform for ClipTransform {
146 async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
147 Ok(input.mapv(|x| x.clamp(self.min, self.max)))
148 }
149
150 fn name(&self) -> &str {
151 "ClipTransform"
152 }
153}
154
155pub struct NormalizeTransform {
157 min_val: Option<f32>,
158 max_val: Option<f32>,
159 centered: bool,
160}
161
162impl NormalizeTransform {
163 pub fn new(centered: bool) -> Self {
165 Self {
166 min_val: None,
167 max_val: None,
168 centered,
169 }
170 }
171
172 pub fn with_range(min: f32, max: f32, centered: bool) -> Self {
174 Self {
175 min_val: Some(min),
176 max_val: Some(max),
177 centered,
178 }
179 }
180}
181
182#[async_trait]
183impl StreamTransform for NormalizeTransform {
184 async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
185 let min = self
186 .min_val
187 .unwrap_or_else(|| input.iter().copied().fold(f32::INFINITY, f32::min));
188 let max = self
189 .max_val
190 .unwrap_or_else(|| input.iter().copied().fold(f32::NEG_INFINITY, f32::max));
191
192 let range = max - min;
193 if range.abs() < 1e-10 {
194 return Ok(input);
195 }
196
197 let normalized = if self.centered {
198 (input - min) / range * 2.0 - 1.0
200 } else {
201 (input - min) / range
203 };
204
205 Ok(normalized)
206 }
207
208 fn name(&self) -> &str {
209 "NormalizeTransform"
210 }
211}
212
213pub struct DecimateTransform {
215 factor: usize,
216}
217
218impl DecimateTransform {
219 pub fn new(factor: usize) -> Self {
220 assert!(factor > 0, "Decimation factor must be > 0");
221 Self { factor }
222 }
223}
224
225#[async_trait]
226impl StreamTransform for DecimateTransform {
227 async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
228 let output: Vec<f32> = input.iter().step_by(self.factor).copied().collect();
229 Ok(Array1::from_vec(output))
230 }
231
232 fn name(&self) -> &str {
233 "DecimateTransform"
234 }
235}
236
237pub struct MovingAverageTransform {
239 window_size: usize,
240 buffer: Vec<f32>,
241}
242
243impl MovingAverageTransform {
244 pub fn new(window_size: usize) -> Self {
245 assert!(window_size > 0, "Window size must be > 0");
246 Self {
247 window_size,
248 buffer: Vec::with_capacity(window_size),
249 }
250 }
251}
252
253#[async_trait]
254impl StreamTransform for MovingAverageTransform {
255 async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
256 let mut output = Vec::with_capacity(input.len());
257
258 for &val in input.iter() {
259 self.buffer.push(val);
260 if self.buffer.len() > self.window_size {
261 self.buffer.remove(0);
262 }
263
264 let avg: f32 = self.buffer.iter().sum::<f32>() / self.buffer.len() as f32;
265 output.push(avg);
266 }
267
268 Ok(Array1::from_vec(output))
269 }
270
271 fn name(&self) -> &str {
272 "MovingAverageTransform"
273 }
274}
275
276pub struct DerivativeTransform {
278 last_value: Option<f32>,
279}
280
281impl DerivativeTransform {
282 pub fn new() -> Self {
283 Self { last_value: None }
284 }
285}
286
287impl Default for DerivativeTransform {
288 fn default() -> Self {
289 Self::new()
290 }
291}
292
293#[async_trait]
294impl StreamTransform for DerivativeTransform {
295 async fn transform(&mut self, input: Array1<f32>) -> IoResult<Array1<f32>> {
296 let mut output = Vec::with_capacity(input.len());
297
298 for &val in input.iter() {
299 if let Some(last) = self.last_value {
300 output.push(val - last);
301 } else {
302 output.push(0.0);
303 }
304 self.last_value = Some(val);
305 }
306
307 Ok(Array1::from_vec(output))
308 }
309
310 fn name(&self) -> &str {
311 "DerivativeTransform"
312 }
313}
314
315pub struct ParallelPipeline {
317 pipelines: Vec<Pipeline>,
318 combiner: CombineStrategy,
319}
320
321#[derive(Debug, Clone, Copy)]
322pub enum CombineStrategy {
323 Average,
325 Maximum,
327 Minimum,
329 Sum,
331}
332
333impl ParallelPipeline {
334 pub fn new(combiner: CombineStrategy) -> Self {
336 Self {
337 pipelines: Vec::new(),
338 combiner,
339 }
340 }
341
342 pub fn add_pipeline(mut self, pipeline: Pipeline) -> Self {
344 self.pipelines.push(pipeline);
345 self
346 }
347
348 pub async fn process(&mut self, data: Array1<f32>) -> IoResult<Array1<f32>> {
350 if self.pipelines.is_empty() {
351 return Ok(data);
352 }
353
354 let mut results = Vec::with_capacity(self.pipelines.len());
355
356 for pipeline in &mut self.pipelines {
358 let result = pipeline.process(data.clone()).await?;
359 results.push(result);
360 }
361
362 self.combine_results(results)
364 }
365
366 fn combine_results(&self, results: Vec<Array1<f32>>) -> IoResult<Array1<f32>> {
367 if results.is_empty() {
368 return Err(IoError::InvalidConfig("No results to combine".to_string()));
369 }
370
371 let len = results[0].len();
372 let mut combined = Array1::zeros(len);
373
374 match self.combiner {
375 CombineStrategy::Average => {
376 for result in &results {
377 combined += result;
378 }
379 combined /= results.len() as f32;
380 }
381 CombineStrategy::Maximum => {
382 for i in 0..len {
383 let max = results
384 .iter()
385 .map(|r| r[i])
386 .fold(f32::NEG_INFINITY, f32::max);
387 combined[i] = max;
388 }
389 }
390 CombineStrategy::Minimum => {
391 for i in 0..len {
392 let min = results.iter().map(|r| r[i]).fold(f32::INFINITY, f32::min);
393 combined[i] = min;
394 }
395 }
396 CombineStrategy::Sum => {
397 for result in &results {
398 combined += result;
399 }
400 }
401 }
402
403 Ok(combined)
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410
411 #[tokio::test]
412 async fn test_scale_transform() {
413 let mut transform = ScaleTransform::new(2.0);
414 let input = Array1::from_vec(vec![1.0, 2.0, 3.0]);
415 let output = transform.transform(input).await.unwrap();
416 assert_eq!(output.as_slice().unwrap(), &[2.0, 4.0, 6.0]);
417 }
418
419 #[tokio::test]
420 async fn test_pipeline() {
421 let mut pipeline = Pipeline::new()
422 .add_transform(ScaleTransform::new(2.0))
423 .add_transform(OffsetTransform::new(1.0));
424
425 let input = Array1::from_vec(vec![1.0, 2.0, 3.0]);
426 let output = pipeline.process(input).await.unwrap();
427 assert_eq!(output.as_slice().unwrap(), &[3.0, 5.0, 7.0]);
428 }
429
430 #[tokio::test]
431 async fn test_clip_transform() {
432 let mut transform = ClipTransform::new(-1.0, 1.0);
433 let input = Array1::from_vec(vec![-2.0, 0.5, 2.0]);
434 let output = transform.transform(input).await.unwrap();
435 assert_eq!(output.as_slice().unwrap(), &[-1.0, 0.5, 1.0]);
436 }
437
438 #[tokio::test]
439 async fn test_normalize_transform() {
440 let mut transform = NormalizeTransform::with_range(0.0, 10.0, false);
441 let input = Array1::from_vec(vec![0.0, 5.0, 10.0]);
442 let output = transform.transform(input).await.unwrap();
443 assert_eq!(output.as_slice().unwrap(), &[0.0, 0.5, 1.0]);
444 }
445}