1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3use std::io::Result;
32
33#[cfg(feature = "download_binaries")]
34mod download_binaries;
35
36use influxive_core::*;
37use influxive_writer::*;
38
39pub use influxive_writer::InfluxiveWriterConfig;
40
41macro_rules! cmd_output {
42 ($cmd:expr $(,$arg:expr)*) => {async {
43 let mut proc = tokio::process::Command::new($cmd);
44 proc.stdin(std::process::Stdio::null());
45 proc.kill_on_drop(true);
46 $(
47 proc.arg($arg);
48 )*
49 let output = proc.output().await?;
50 let err = String::from_utf8_lossy(&output.stderr);
51 if !err.is_empty() {
52 Err(err_other(err.to_string()))
53 } else {
54 Ok(String::from_utf8_lossy(&output.stdout).to_string())
55 }
56 }.await}
57}
58
59#[derive(Debug)]
61#[non_exhaustive]
62pub struct InfluxiveChildSvcConfig {
63 #[cfg(feature = "download_binaries")]
64 pub download_binaries: bool,
67
68 pub influxd_path: Option<std::path::PathBuf>,
71
72 pub influx_path: Option<std::path::PathBuf>,
75
76 pub database_path: Option<std::path::PathBuf>,
80
81 pub user: String,
84
85 pub pass: String,
88
89 pub org: String,
92
93 pub bucket: String,
96
97 pub retention: String,
100
101 pub metric_write: InfluxiveWriterConfig,
103}
104
105impl Default for InfluxiveChildSvcConfig {
106 fn default() -> Self {
107 Self {
108 download_binaries: true,
109 influxd_path: None,
110 influx_path: None,
111 database_path: None,
112 user: "influxive".to_string(),
113 pass: "influxive".to_string(),
114 org: "influxive".to_string(),
115 bucket: "influxive".to_string(),
116 retention: "72h".to_string(),
117 metric_write: InfluxiveWriterConfig::default(),
118 }
119 }
120}
121
122impl InfluxiveChildSvcConfig {
123 pub fn with_download_binaries(mut self, download_binaries: bool) -> Self {
125 self.download_binaries = download_binaries;
126 self
127 }
128
129 pub fn with_influxd_path(
131 mut self,
132 influxd_path: Option<std::path::PathBuf>,
133 ) -> Self {
134 self.influxd_path = influxd_path;
135 self
136 }
137
138 pub fn with_influx_path(
140 mut self,
141 influx_path: Option<std::path::PathBuf>,
142 ) -> Self {
143 self.influx_path = influx_path;
144 self
145 }
146
147 pub fn with_database_path(
149 mut self,
150 database_path: Option<std::path::PathBuf>,
151 ) -> Self {
152 self.database_path = database_path;
153 self
154 }
155
156 pub fn with_user(mut self, user: String) -> Self {
158 self.user = user;
159 self
160 }
161
162 pub fn with_pass(mut self, pass: String) -> Self {
164 self.pass = pass;
165 self
166 }
167
168 pub fn with_org(mut self, org: String) -> Self {
170 self.org = org;
171 self
172 }
173
174 pub fn with_bucket(mut self, bucket: String) -> Self {
176 self.bucket = bucket;
177 self
178 }
179
180 pub fn with_retention(mut self, retention: String) -> Self {
182 self.retention = retention;
183 self
184 }
185
186 pub fn with_metric_write(
188 mut self,
189 metric_write: InfluxiveWriterConfig,
190 ) -> Self {
191 self.metric_write = metric_write;
192 self
193 }
194}
195
196pub struct InfluxiveChildSvc {
200 config: InfluxiveChildSvcConfig,
201 host: String,
202 token: String,
203 child: std::sync::Mutex<Option<tokio::process::Child>>,
204 influx_path: std::path::PathBuf,
205 writer: InfluxiveWriter,
206}
207
208impl InfluxiveChildSvc {
209 pub async fn new(config: InfluxiveChildSvcConfig) -> Result<Self> {
211 let db_path = config.database_path.clone().unwrap_or_else(|| {
212 let mut db_path = std::path::PathBuf::from(".");
213 db_path.push("influxive");
214 db_path
215 });
216
217 tokio::fs::create_dir_all(&db_path).await?;
218
219 let influxd_path = validate_influx(&db_path, &config, false).await?;
220
221 let influx_path = validate_influx(&db_path, &config, true).await?;
222
223 let (child, port) = spawn_influxd(&db_path, &influxd_path).await?;
224
225 let host = format!("http://127.0.0.1:{port}");
226
227 let mut configs_path = std::path::PathBuf::from(&db_path);
228 configs_path.push("configs");
229
230 if let Err(err) = cmd_output!(
231 &influx_path,
232 "setup",
233 "--json",
234 "--configs-path",
235 &configs_path,
236 "--host",
237 &host,
238 "--username",
239 &config.user,
240 "--password",
241 &config.pass,
242 "--org",
243 &config.org,
244 "--bucket",
245 &config.bucket,
246 "--retention",
247 &config.retention,
248 "--force"
249 ) {
250 let repr = format!("{err:?}");
251 if !repr.contains("Error: instance has already been set up") {
252 return Err(err);
253 }
254 }
255
256 let token = tokio::fs::read(&configs_path).await?;
257 let token = String::from_utf8_lossy(&token);
258 let mut token = token.split("token = \"");
259 token.next().unwrap();
260 let token = token.next().unwrap();
261 let mut token = token.split('\"');
262 let token = token.next().unwrap().to_string();
263
264 let writer = InfluxiveWriter::with_token_auth(
265 config.metric_write.clone(),
266 &host,
267 &config.bucket,
268 &token,
269 );
270
271 let bucket = config.bucket.clone();
272
273 let this = Self {
274 config,
275 host,
276 token,
277 child: std::sync::Mutex::new(Some(child)),
278 influx_path,
279 writer,
280 };
281
282 let mut millis = 10;
283
284 for _ in 0..10 {
285 this.write_metric(
287 Metric::new(std::time::SystemTime::now(), "influxive.start")
288 .with_field("value", true),
289 );
290
291 if let Ok(result) = this
292 .query(format!(
293 r#"from(bucket: "{bucket}")
294|> range(start: -15m, stop: now())
295|> filter(fn: (r) => r["_measurement"] == "influxive.start")
296|> filter(fn: (r) => r["_field"] == "value")"#,
297 ))
298 .await
299 {
300 if result.split('\n').count() >= 3 {
301 return Ok(this);
302 }
303 }
304
305 tokio::time::sleep(std::time::Duration::from_millis(millis)).await;
306 millis *= 2;
307 }
308
309 Err(err_other("Unable to start influxd"))
310 }
311
312 pub fn shutdown(&self) {
314 drop(self.child.lock().unwrap().take());
315 }
316
317 pub fn get_config(&self) -> &InfluxiveChildSvcConfig {
319 &self.config
320 }
321
322 pub fn get_host(&self) -> &str {
324 &self.host
325 }
326
327 pub fn get_token(&self) -> &str {
329 &self.token
330 }
331
332 pub async fn ping(&self) -> Result<()> {
334 cmd_output!(&self.influx_path, "ping", "--host", &self.host)?;
335 Ok(())
336 }
337
338 pub async fn query<Q: Into<StringType>>(
342 &self,
343 flux_query: Q,
344 ) -> Result<String> {
345 cmd_output!(
346 &self.influx_path,
347 "query",
348 "--raw",
349 "--org",
350 &self.config.org,
351 "--host",
352 &self.host,
353 "--token",
354 &self.token,
355 flux_query.into().into_string()
356 )
357 }
358
359 pub async fn list_dashboards(&self) -> Result<String> {
361 cmd_output!(
362 &self.influx_path,
363 "dashboards",
364 "--org",
365 &self.config.org,
366 "--host",
367 &self.host,
368 "--token",
369 &self.token,
370 "--json"
371 )
372 }
373
374 pub async fn apply(&self, template: &[u8]) -> Result<String> {
376 use tokio::io::AsyncWriteExt;
377
378 let (file, tmp) = tempfile::Builder::new()
379 .suffix(".json")
380 .tempfile()?
381 .into_parts();
382 let mut file = tokio::fs::File::from_std(file);
383
384 file.write_all(template).await?;
385 file.shutdown().await?;
386
387 let result = cmd_output!(
388 &self.influx_path,
389 "apply",
390 "--org",
391 &self.config.org,
392 "--host",
393 &self.host,
394 "--token",
395 &self.token,
396 "--json",
397 "--force",
398 "yes",
399 "--file",
400 &tmp
401 );
402
403 drop(file);
404
405 let _ = tmp.close();
407
408 result
409 }
410
411 pub fn write_metric(&self, metric: Metric) {
417 self.writer.write_metric(metric);
418 }
419}
420
421impl MetricWriter for InfluxiveChildSvc {
422 fn write_metric(&self, metric: Metric) {
423 InfluxiveChildSvc::write_metric(self, metric);
424 }
425}
426
427#[cfg(feature = "download_binaries")]
428async fn dl_influx(
429 _db_path: &std::path::Path,
430 is_cli: bool,
431 bin_path: &mut std::path::PathBuf,
432 err_list: &mut Vec<std::io::Error>,
433) -> Option<String> {
434 let spec = if is_cli {
435 &download_binaries::DL_CLI
436 } else {
437 &download_binaries::DL_DB
438 };
439
440 if let Some(spec) = &spec {
441 match spec.download(_db_path).await {
442 Ok(path) => {
443 *bin_path = path;
444 match cmd_output!(&bin_path, "version") {
445 Ok(ver) => return Some(ver),
446 Err(err) => {
447 err_list.push(err_other(format!(
448 "failed to run {bin_path:?}"
449 )));
450 err_list.push(err);
451 }
452 }
453 }
454 Err(err) => {
455 err_list.push(err_other("failed to download"));
456 err_list.push(err);
457 }
458 }
459 } else {
460 err_list
461 .push(err_other("no download configured for this target os/arch"));
462 }
463
464 None
465}
466
467async fn validate_influx(
468 _db_path: &std::path::Path,
469 config: &InfluxiveChildSvcConfig,
470 is_cli: bool,
471) -> Result<std::path::PathBuf> {
472 let mut bin_path = if is_cli {
473 "influx".into()
474 } else {
475 "influxd".into()
476 };
477
478 if is_cli {
479 if let Some(path) = &config.influx_path {
480 bin_path = path.clone();
481 }
482 } else if let Some(path) = &config.influxd_path {
483 bin_path = path.clone();
484 };
485
486 let ver = match cmd_output!(&bin_path, "version") {
487 Ok(ver) => ver,
488 Err(err) => {
489 let mut err_list = Vec::new();
490 err_list.push(err_other(format!("failed to run {bin_path:?}")));
491 err_list.push(err);
492
493 #[cfg(feature = "download_binaries")]
494 {
495 if let Some(ver) =
496 dl_influx(_db_path, is_cli, &mut bin_path, &mut err_list)
497 .await
498 {
499 ver
500 } else {
501 return Err(err_other(format!("{err_list:?}",)));
502 }
503 }
504
505 #[cfg(not(feature = "download_binaries"))]
506 {
507 return Err(err_other(format!("{err_list:?}",)));
508 }
509 }
510 };
511
512 if is_cli && !ver.contains("build_date: 2023-04-28") {
514 return Err(err_other(format!("invalid build_date: {ver}")));
515 } else if !is_cli && !ver.contains("InfluxDB v2.7.6") {
516 return Err(err_other(format!("invalid version: {ver}")));
517 }
518
519 Ok(bin_path)
520}
521
522async fn spawn_influxd(
523 db_path: &std::path::Path,
524 influxd_path: &std::path::Path,
525) -> Result<(tokio::process::Child, u16)> {
526 use tokio::io::AsyncBufReadExt;
527
528 let (s, r) = tokio::sync::oneshot::channel();
529
530 let mut s = Some(s);
531
532 let mut engine_path = std::path::PathBuf::from(db_path);
533 engine_path.push("engine");
534 let mut bolt_path = std::path::PathBuf::from(db_path);
535 bolt_path.push("influxd.bolt");
536
537 let mut proc = tokio::process::Command::new(influxd_path);
538 proc.kill_on_drop(true);
539 proc.arg("--engine-path").arg(engine_path);
540 proc.arg("--bolt-path").arg(bolt_path);
541 proc.arg("--http-bind-address").arg("127.0.0.1:0");
542 proc.arg("--metrics-disabled");
543 proc.arg("--reporting-disabled");
544 proc.stdout(std::process::Stdio::piped());
545
546 let proc_err = format!("{proc:?}");
547
548 let mut child = proc
549 .spawn()
550 .map_err(|err| err_other(format!("{proc_err}: {err:?}")))?;
551
552 let stdout = child.stdout.take().unwrap();
553 let mut reader = tokio::io::BufReader::new(stdout).lines();
554
555 tokio::task::spawn(async move {
556 while let Some(line) = reader.next_line().await.expect("got line") {
557 tracing::trace!(?line, "influxd stdout");
558 if line.contains("msg=Listening")
559 && line.contains("service=tcp-listener")
560 && line.contains("transport=http")
561 {
562 let mut iter = line.split(" port=");
563 iter.next().unwrap();
564 let item = iter.next().unwrap();
565 let port: u16 = item.parse().unwrap();
566 if let Some(s) = s.take() {
567 let _ = s.send(port);
568 }
569 }
570 }
571 });
572
573 let port = r.await.map_err(|_| err_other("Failed to scrape port"))?;
574
575 Ok((child, port))
576}
577
578#[cfg(test)]
579mod test;