swiftide_integrations/fluvio/
loader.rs1use std::string::ToString;
2
3use anyhow::Context as _;
4use futures_util::{StreamExt as _, TryStreamExt as _};
5use swiftide_core::{
6 Loader,
7 indexing::{IndexingStream, TextNode},
8};
9use tokio::runtime::Handle;
10
11use super::Fluvio;
12
13impl Loader for Fluvio {
14 type Output = String;
15
16 #[tracing::instrument]
17 fn into_stream(self) -> IndexingStream<String> {
18 let fluvio_config = self.fluvio_config;
19 let consumer_config = self.consumer_config_ext;
20
21 let stream = tokio::task::block_in_place(|| {
22 Handle::current().block_on(async {
23 let client = if let Some(fluvio_config) = &fluvio_config {
24 fluvio::Fluvio::connect_with_config(fluvio_config).await
25 } else {
26 fluvio::Fluvio::connect().await
27 }
28 .context(format!("Failed to connect to Fluvio {fluvio_config:?}"))?;
29 client.consumer_with_config(consumer_config).await
30 })
31 })
32 .expect("Failed to connect to Fluvio");
33
34 let swiftide_stream = stream
35 .map_ok(|f| {
36 let mut node = TextNode::new(f.get_value().to_string());
37 node.metadata
38 .insert("fluvio_key", f.get_key().map(ToString::to_string));
39
40 node
41 })
42 .map_err(anyhow::Error::from);
43
44 swiftide_stream.boxed().into()
45 }
46
47 fn into_stream_boxed(self: Box<Self>) -> IndexingStream<String> {
48 self.into_stream()
49 }
50}
51
52#[cfg(test)]
53mod tests {
54 use std::pin::Pin;
55
56 use super::*;
57 use anyhow::Result;
58 use fluvio::{
59 RecordKey,
60 consumer::ConsumerConfigExt,
61 metadata::{customspu::CustomSpuSpec, topic::TopicSpec},
62 };
63 use flv_util::socket_helpers::ServerAddress;
64 use futures_util::TryStreamExt;
65 use regex::Regex;
66 use testcontainers::{ContainerAsync, GenericImage, ImageExt, runners::AsyncRunner};
67 use tokio::io::{AsyncBufRead, AsyncBufReadExt};
68
69 struct FluvioCluster {
71 sc: ContainerAsync<GenericImage>,
72 spu: ContainerAsync<GenericImage>,
73
74 partitions: u32,
75 replicas: u32,
76 port: u16,
77 host_spu_port: u16,
78 client: fluvio::Fluvio,
79 }
80
81 impl FluvioCluster {
82 pub async fn start() -> Result<FluvioCluster> {
84 static SC_PORT: u16 = 9003;
85 static SPU_PORT1: u16 = 9010;
86 static SPU_PORT2: u16 = 9011;
87 static NETWORK_NAME: &str = "fluvio";
88 static PARTITIONS: u32 = 1;
89 static REPLICAS: u32 = 1;
90
91 let sc = GenericImage::new("infinyon/fluvio", "latest")
92 .with_exposed_port(SC_PORT.into())
93 .with_wait_for(testcontainers::core::WaitFor::message_on_stdout(
94 "started successfully",
95 ))
96 .with_wait_for(testcontainers::core::WaitFor::seconds(1))
97 .with_network(NETWORK_NAME)
98 .with_container_name("sc")
99 .with_cmd("./fluvio-run sc --local /fluvio/metadata".split(' '))
100 .with_env_var("RUST_LOG", "info")
101 .start()
102 .await?;
103
104 let spu = GenericImage::new("infinyon/fluvio", "latest")
105 .with_exposed_port(SPU_PORT1.into())
106 .with_wait_for(testcontainers::core::WaitFor::message_on_stdout(
107 "started successfully",
108 ))
109 .with_wait_for(testcontainers::core::WaitFor::seconds(1))
110 .with_network(NETWORK_NAME)
111 .with_container_name("spu")
112 .with_cmd(format!("./fluvio-run spu -i 5001 -p spu:{SPU_PORT1} -v spu:{SPU_PORT2} --sc-addr sc:9004 --log-base-dir /fluvio/data").split(' '))
113 .with_env_var("RUST_LOG", "info")
114 .start()
115 .await?;
116
117 let host_spu_port_1 = spu.get_host_port_ipv4(SPU_PORT1).await?;
118 let sc_host_port = sc.get_host_port_ipv4(SC_PORT).await?;
119 let endpoint = format!("127.0.0.1:{sc_host_port}");
120 let config = fluvio::FluvioConfig::new(&endpoint);
121 let client = fluvio::Fluvio::connect_with_config(&config).await?;
122
123 let cluster = FluvioCluster {
124 sc,
125 spu,
126 port: sc_host_port,
127 host_spu_port: host_spu_port_1,
128 client,
129 replicas: REPLICAS,
130 partitions: PARTITIONS,
131 };
132
133 cluster.connect_spu_to_sc().await;
134
135 Ok(cluster)
136 }
137
138 async fn connect_spu_to_sc(&self) {
139 let admin = self.client().admin().await;
140
141 let spu_spec = CustomSpuSpec {
142 id: 5001,
143 public_endpoint: ServerAddress::try_from(format!("0.0.0.0:{}", self.host_spu_port))
144 .unwrap()
145 .into(),
146 private_endpoint: ServerAddress::try_from(format!("spu:{}", 9011))
147 .unwrap()
148 .into(),
149 rack: None,
150 public_endpoint_local: None,
151 };
152
153 admin
154 .create("SPU".to_string(), false, spu_spec)
155 .await
156 .unwrap();
157 }
158
159 pub fn forward_logs_to_tracing(&self) {
160 Self::log_stdout(self.sc.stdout(true));
161 Self::log_stderr(self.sc.stderr(true));
162
163 Self::log_stdout(self.spu.stdout(true));
164 Self::log_stderr(self.spu.stderr(true));
165 }
166
167 pub fn client(&self) -> &fluvio::Fluvio {
168 &self.client
169 }
170
171 pub async fn create_topic(&self, topic_name: impl Into<String>) -> Result<()> {
172 let admin = self.client().admin().await;
173 let topic_spec = TopicSpec::new_computed(self.partitions, self.replicas, None);
174
175 admin.create(topic_name.into(), false, topic_spec).await
176 }
177
178 fn log_stdout(reader: Pin<Box<dyn AsyncBufRead + Send>>) {
179 let regex = Self::ansii_regex();
180 tokio::spawn(async move {
181 let mut lines = reader.lines();
182 while let Some(line) = lines.next_line().await.unwrap() {
183 let line = regex.replace_all(&line, "").to_string();
184 tracing::info!(line);
185 }
186 });
187 }
188
189 fn log_stderr(reader: Pin<Box<dyn AsyncBufRead + Send>>) {
190 let regex = Self::ansii_regex();
191 tokio::spawn(async move {
192 let mut lines = reader.lines();
193 while let Some(line) = lines.next_line().await.unwrap() {
194 let line = regex.replace_all(&line, "").to_string();
195 tracing::error!(line);
196 }
197 });
198 }
199
200 fn ansii_regex() -> Regex {
201 regex::Regex::new(r"\x1b\[([\x30-\x3f]*[\x20-\x2f]*[\x40-\x7e])").unwrap()
202 }
203
204 pub fn endpoint(&self) -> String {
205 format!("127.0.0.1:{}", self.port)
206 }
207 }
208
209 #[test_log::test(tokio::test(flavor = "multi_thread"))]
210 async fn test_fluvio_loader() {
211 static TOPIC_NAME: &str = "hello-rust";
212 static PARTITION_NUM: u32 = 0;
213
214 let fluvio_cluster = FluvioCluster::start()
215 .await
216 .expect("Failed to start Fluvio cluster");
217
218 fluvio_cluster.forward_logs_to_tracing();
219 fluvio_cluster.create_topic(TOPIC_NAME).await.unwrap();
220
221 let client = fluvio_cluster.client();
222
223 let producer = client.topic_producer(TOPIC_NAME).await.unwrap();
224 producer
225 .send(RecordKey::NULL, "Hello fluvio")
226 .await
227 .unwrap();
228 producer.flush().await.unwrap();
229
230 let config = fluvio::FluvioConfig::new(fluvio_cluster.endpoint());
232 let loader = Fluvio::builder()
233 .fluvio_config(&config)
234 .consumer_config_ext(
235 ConsumerConfigExt::builder()
236 .topic(TOPIC_NAME)
237 .partition(PARTITION_NUM)
238 .offset_start(fluvio::Offset::from_end(1))
239 .build()
240 .unwrap(),
241 )
242 .build()
243 .unwrap();
244
245 let node: TextNode = loader.into_stream().try_next().await.unwrap().unwrap();
246
247 assert_eq!(node.chunk, "Hello fluvio");
248 }
249}