arkflow_plugin/output/
drop.rs1use arkflow_core::output::{register_output_builder, Output, OutputBuilder};
7use arkflow_core::{Error, MessageBatch};
8use async_trait::async_trait;
9use std::sync::Arc;
10
11pub struct DropOutput;
16
17#[async_trait]
18impl Output for DropOutput {
19 async fn connect(&self) -> Result<(), Error> {
20 Ok(())
21 }
22
23 async fn write(&self, _: &MessageBatch) -> Result<(), Error> {
24 Ok(())
25 }
26
27 async fn close(&self) -> Result<(), Error> {
28 Ok(())
29 }
30}
31
32pub(crate) struct DropOutputBuilder;
33impl OutputBuilder for DropOutputBuilder {
34 fn build(&self, _: &Option<serde_json::Value>) -> Result<Arc<dyn Output>, Error> {
35 Ok(Arc::new(DropOutput))
36 }
37}
38
39pub fn init() {
40 register_output_builder("drop", Arc::new(DropOutputBuilder));
41}
42
43#[cfg(test)]
44mod tests {
45 use crate::output::drop::DropOutput;
46
47 use arkflow_core::output::Output;
48 use arkflow_core::MessageBatch;
49 use datafusion::arrow::array::{Int32Array, StringArray};
50 use datafusion::arrow::datatypes::{DataType, Field, Schema};
51 use datafusion::arrow::record_batch::RecordBatch;
52 use std::sync::Arc;
53
54 #[tokio::test]
55 async fn test_drop_output_connect() {
56 let drop_output = DropOutput;
58
59 let result = drop_output.connect().await;
61 assert!(result.is_ok(), "connect() should return Ok(())");
62 }
63
64 #[tokio::test]
65 async fn test_drop_output_write_binary() {
66 let drop_output = DropOutput;
68
69 let binary_data = vec![b"test message".to_vec()];
71 let message_batch = MessageBatch::new_binary(binary_data);
72
73 let result = drop_output.write(&message_batch).await;
75 assert!(
76 result.is_ok(),
77 "write() should return Ok(()) for binary data"
78 );
79 }
80
81 #[tokio::test]
82 async fn test_drop_output_write_arrow() {
83 let drop_output = DropOutput;
85
86 let schema = Arc::new(Schema::new(vec![
88 Field::new("id", DataType::Int32, false),
89 Field::new("name", DataType::Utf8, false),
90 ]));
91
92 let id_array = Int32Array::from(vec![1, 2, 3]);
93 let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
94
95 let batch =
96 RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap();
97
98 let message_batch = MessageBatch::new_arrow(batch);
99
100 let result = drop_output.write(&message_batch).await;
102 assert!(
103 result.is_ok(),
104 "write() should return Ok(()) for Arrow data"
105 );
106 }
107
108 #[tokio::test]
109 async fn test_drop_output_close() {
110 let drop_output = DropOutput;
112
113 let result = drop_output.close().await;
115 assert!(result.is_ok(), "close() should return Ok(())");
116 }
117
118 #[tokio::test]
119 async fn test_drop_output_full_lifecycle() {
120 let drop_output = DropOutput;
122
123 let connect_result = drop_output.connect().await;
125 assert!(connect_result.is_ok(), "connect() should return Ok(())");
126
127 let binary_data = vec![b"test message".to_vec()];
129 let message_batch = MessageBatch::new_binary(binary_data);
130
131 for _ in 0..5 {
133 let write_result = drop_output.write(&message_batch).await;
134 assert!(write_result.is_ok(), "write() should return Ok(())");
135 }
136
137 let close_result = drop_output.close().await;
138 assert!(close_result.is_ok(), "close() should return Ok(())");
139 }
140}