arkflow_plugin/output/
drop.rs

1//! Drop output component
2//!
3//! This component discards all messages without performing any operations.
4//! It's useful for testing or when you want to intentionally discard data.
5
6use arkflow_core::output::{register_output_builder, Output, OutputBuilder};
7use arkflow_core::{Error, MessageBatch};
8use async_trait::async_trait;
9use std::sync::Arc;
10
11/// Drop output component that discards all messages
12///
13/// This component implements the `Output` trait but doesn't perform any actual
14/// output operations. All messages sent to this output are simply discarded.
15pub struct DropOutput;
16
17#[async_trait]
18impl Output for DropOutput {
19    async fn connect(&self) -> Result<(), Error> {
20        Ok(())
21    }
22
23    async fn write(&self, _: &MessageBatch) -> Result<(), Error> {
24        Ok(())
25    }
26
27    async fn close(&self) -> Result<(), Error> {
28        Ok(())
29    }
30}
31
32pub(crate) struct DropOutputBuilder;
33impl OutputBuilder for DropOutputBuilder {
34    fn build(&self, _: &Option<serde_json::Value>) -> Result<Arc<dyn Output>, Error> {
35        Ok(Arc::new(DropOutput))
36    }
37}
38
39pub fn init() {
40    register_output_builder("drop", Arc::new(DropOutputBuilder));
41}
42
43#[cfg(test)]
44mod tests {
45    use crate::output::drop::DropOutput;
46
47    use arkflow_core::output::Output;
48    use arkflow_core::MessageBatch;
49    use datafusion::arrow::array::{Int32Array, StringArray};
50    use datafusion::arrow::datatypes::{DataType, Field, Schema};
51    use datafusion::arrow::record_batch::RecordBatch;
52    use std::sync::Arc;
53
54    #[tokio::test]
55    async fn test_drop_output_connect() {
56        // Create a DropOutput instance
57        let drop_output = DropOutput;
58
59        // Test connect method
60        let result = drop_output.connect().await;
61        assert!(result.is_ok(), "connect() should return Ok(())");
62    }
63
64    #[tokio::test]
65    async fn test_drop_output_write_binary() {
66        // Create a DropOutput instance
67        let drop_output = DropOutput;
68
69        // Create a binary message batch
70        let binary_data = vec![b"test message".to_vec()];
71        let message_batch = MessageBatch::new_binary(binary_data);
72
73        // Test write method with binary data
74        let result = drop_output.write(&message_batch).await;
75        assert!(
76            result.is_ok(),
77            "write() should return Ok(()) for binary data"
78        );
79    }
80
81    #[tokio::test]
82    async fn test_drop_output_write_arrow() {
83        // Create a DropOutput instance
84        let drop_output = DropOutput;
85
86        // Create an Arrow message batch
87        let schema = Arc::new(Schema::new(vec![
88            Field::new("id", DataType::Int32, false),
89            Field::new("name", DataType::Utf8, false),
90        ]));
91
92        let id_array = Int32Array::from(vec![1, 2, 3]);
93        let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
94
95        let batch =
96            RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap();
97
98        let message_batch = MessageBatch::new_arrow(batch);
99
100        // Test write method with Arrow data
101        let result = drop_output.write(&message_batch).await;
102        assert!(
103            result.is_ok(),
104            "write() should return Ok(()) for Arrow data"
105        );
106    }
107
108    #[tokio::test]
109    async fn test_drop_output_close() {
110        // Create a DropOutput instance
111        let drop_output = DropOutput;
112
113        // Test close method
114        let result = drop_output.close().await;
115        assert!(result.is_ok(), "close() should return Ok(())");
116    }
117
118    #[tokio::test]
119    async fn test_drop_output_full_lifecycle() {
120        // Create a DropOutput instance
121        let drop_output = DropOutput;
122
123        // Test the full lifecycle: connect -> write -> close
124        let connect_result = drop_output.connect().await;
125        assert!(connect_result.is_ok(), "connect() should return Ok(())");
126
127        // Create a binary message batch
128        let binary_data = vec![b"test message".to_vec()];
129        let message_batch = MessageBatch::new_binary(binary_data);
130
131        // Write multiple messages
132        for _ in 0..5 {
133            let write_result = drop_output.write(&message_batch).await;
134            assert!(write_result.is_ok(), "write() should return Ok(())");
135        }
136
137        let close_result = drop_output.close().await;
138        assert!(close_result.is_ok(), "close() should return Ok(())");
139    }
140}