swiftide_integrations/fluvio/
loader.rs

1use std::string::ToString;
2
3use anyhow::Context as _;
4use futures_util::{StreamExt as _, TryStreamExt as _};
5use swiftide_core::{
6    Loader,
7    indexing::{IndexingStream, TextNode},
8};
9use tokio::runtime::Handle;
10
11use super::Fluvio;
12
13impl Loader for Fluvio {
14    type Output = String;
15
16    #[tracing::instrument]
17    fn into_stream(self) -> IndexingStream<String> {
18        let fluvio_config = self.fluvio_config;
19        let consumer_config = self.consumer_config_ext;
20
21        let stream = tokio::task::block_in_place(|| {
22            Handle::current().block_on(async {
23                let client = if let Some(fluvio_config) = &fluvio_config {
24                    fluvio::Fluvio::connect_with_config(fluvio_config).await
25                } else {
26                    fluvio::Fluvio::connect().await
27                }
28                .context(format!("Failed to connect to Fluvio {fluvio_config:?}"))?;
29                client.consumer_with_config(consumer_config).await
30            })
31        })
32        .expect("Failed to connect to Fluvio");
33
34        let swiftide_stream = stream
35            .map_ok(|f| {
36                let mut node = TextNode::new(f.get_value().to_string());
37                node.metadata
38                    .insert("fluvio_key", f.get_key().map(ToString::to_string));
39
40                node
41            })
42            .map_err(anyhow::Error::from);
43
44        swiftide_stream.boxed().into()
45    }
46
47    fn into_stream_boxed(self: Box<Self>) -> IndexingStream<String> {
48        self.into_stream()
49    }
50}
51
52#[cfg(test)]
53mod tests {
54    use std::pin::Pin;
55
56    use super::*;
57    use anyhow::Result;
58    use fluvio::{
59        RecordKey,
60        consumer::ConsumerConfigExt,
61        metadata::{customspu::CustomSpuSpec, topic::TopicSpec},
62    };
63    use flv_util::socket_helpers::ServerAddress;
64    use futures_util::TryStreamExt;
65    use regex::Regex;
66    use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
67    use tokio::io::{AsyncBufRead, AsyncBufReadExt};
68
69    // NOTE: Move to test-utils / upstream to testcontainers if needed elsewhere
70    struct FluvioCluster {
71        sc: ContainerAsync<GenericImage>,
72        spu: ContainerAsync<GenericImage>,
73
74        partitions: u32,
75        replicas: u32,
76        port: u16,
77        host_spu_port: u16,
78        client: fluvio::Fluvio,
79    }
80
81    impl FluvioCluster {
82        // Starts a fluvio cluster and connects the spu to the sc
83        pub async fn start() -> Result<FluvioCluster> {
84            static SC_PORT: u16 = 9003;
85            static SPU_PORT1: u16 = 9010;
86            static SPU_PORT2: u16 = 9011;
87            static NETWORK_NAME: &str = "fluvio";
88            static PARTITIONS: u32 = 1;
89            static REPLICAS: u32 = 1;
90
91            let sc = GenericImage::new("infinyon/fluvio", "latest")
92                .with_exposed_port(SC_PORT.into())
93                .with_wait_for(testcontainers::core::WaitFor::message_on_stdout(
94                    "started successfully",
95                ))
96                .with_wait_for(testcontainers::core::WaitFor::seconds(1))
97                .with_network(NETWORK_NAME)
98                .with_container_name("sc")
99                .with_cmd("./fluvio-run sc --local /fluvio/metadata".split(' '))
100                .with_env_var("RUST_LOG", "info")
101                .start()
102                .await?;
103
104            let spu = GenericImage::new("infinyon/fluvio", "latest")
105                .with_exposed_port(SPU_PORT1.into())
106                .with_wait_for(testcontainers::core::WaitFor::message_on_stdout(
107                    "started successfully",
108                ))
109                    .with_wait_for(testcontainers::core::WaitFor::seconds(1))
110                .with_network(NETWORK_NAME)
111                .with_container_name("spu")
112                .with_cmd(format!("./fluvio-run spu -i 5001 -p spu:{SPU_PORT1} -v spu:{SPU_PORT2} --sc-addr sc:9004 --log-base-dir /fluvio/data").split(' '))
113                .with_env_var("RUST_LOG", "info")
114                .start()
115                .await?;
116
117            let host_spu_port_1 = spu.get_host_port_ipv4(SPU_PORT1).await?;
118            let sc_host_port = sc.get_host_port_ipv4(SC_PORT).await?;
119            let endpoint = format!("127.0.0.1:{sc_host_port}");
120            let config = fluvio::FluvioConfig::new(&endpoint);
121            let client = fluvio::Fluvio::connect_with_config(&config).await?;
122
123            let cluster = FluvioCluster {
124                sc,
125                spu,
126                port: sc_host_port,
127                host_spu_port: host_spu_port_1,
128                client,
129                replicas: REPLICAS,
130                partitions: PARTITIONS,
131            };
132
133            cluster.connect_spu_to_sc().await;
134
135            Ok(cluster)
136        }
137
138        async fn connect_spu_to_sc(&self) {
139            let admin = self.client().admin().await;
140
141            let spu_spec = CustomSpuSpec {
142                id: 5001,
143                public_endpoint: ServerAddress::try_from(format!("0.0.0.0:{}", self.host_spu_port))
144                    .unwrap()
145                    .into(),
146                private_endpoint: ServerAddress::try_from(format!("spu:{}", 9011))
147                    .unwrap()
148                    .into(),
149                rack: None,
150                public_endpoint_local: None,
151            };
152
153            admin
154                .create("SPU".to_string(), false, spu_spec)
155                .await
156                .unwrap();
157        }
158
159        pub fn forward_logs_to_tracing(&self) {
160            Self::log_stdout(self.sc.stdout(true));
161            Self::log_stderr(self.sc.stderr(true));
162
163            Self::log_stdout(self.spu.stdout(true));
164            Self::log_stderr(self.spu.stderr(true));
165        }
166
167        pub fn client(&self) -> &fluvio::Fluvio {
168            &self.client
169        }
170
171        pub async fn create_topic(&self, topic_name: impl Into<String>) -> Result<()> {
172            let admin = self.client().admin().await;
173            let topic_spec = TopicSpec::new_computed(self.partitions, self.replicas, None);
174
175            admin.create(topic_name.into(), false, topic_spec).await
176        }
177
178        fn log_stdout(reader: Pin<Box<dyn AsyncBufRead + Send>>) {
179            let regex = Self::ansii_regex();
180            tokio::spawn(async move {
181                let mut lines = reader.lines();
182                while let Some(line) = lines.next_line().await.unwrap() {
183                    let line = regex.replace_all(&line, "").to_string();
184                    tracing::info!(line);
185                }
186            });
187        }
188
189        fn log_stderr(reader: Pin<Box<dyn AsyncBufRead + Send>>) {
190            let regex = Self::ansii_regex();
191            tokio::spawn(async move {
192                let mut lines = reader.lines();
193                while let Some(line) = lines.next_line().await.unwrap() {
194                    let line = regex.replace_all(&line, "").to_string();
195                    tracing::error!(line);
196                }
197            });
198        }
199
200        fn ansii_regex() -> Regex {
201            regex::Regex::new(r"\x1b\[([\x30-\x3f]*[\x20-\x2f]*[\x40-\x7e])").unwrap()
202        }
203
204        pub fn endpoint(&self) -> String {
205            format!("127.0.0.1:{}", self.port)
206        }
207    }
208
209    #[test_log::test(tokio::test(flavor = "multi_thread"))]
210    async fn test_fluvio_loader() {
211        static TOPIC_NAME: &str = "hello-rust";
212        static PARTITION_NUM: u32 = 0;
213
214        let fluvio_cluster = FluvioCluster::start()
215            .await
216            .expect("Failed to start Fluvio cluster");
217
218        fluvio_cluster.forward_logs_to_tracing();
219        fluvio_cluster.create_topic(TOPIC_NAME).await.unwrap();
220
221        let client = fluvio_cluster.client();
222
223        let producer = client.topic_producer(TOPIC_NAME).await.unwrap();
224        producer
225            .send(RecordKey::NULL, "Hello fluvio")
226            .await
227            .unwrap();
228        producer.flush().await.unwrap();
229
230        // Consume the topic with the loader
231        let config = fluvio::FluvioConfig::new(fluvio_cluster.endpoint());
232        let loader = Fluvio::builder()
233            .fluvio_config(&config)
234            .consumer_config_ext(
235                ConsumerConfigExt::builder()
236                    .topic(TOPIC_NAME)
237                    .partition(PARTITION_NUM)
238                    .offset_start(fluvio::Offset::from_end(1))
239                    .build()
240                    .unwrap(),
241            )
242            .build()
243            .unwrap();
244
245        let node: TextNode = loader.into_stream().try_next().await.unwrap().unwrap();
246
247        assert_eq!(node.chunk, "Hello fluvio");
248    }
249}