influxive_child_svc/
lib.rs

1#![deny(missing_docs)]
2#![deny(unsafe_code)]
3//! Run influxd as a child process.
4//!
5//! ## Example
6//!
7//! ```
8//! # #[tokio::main(flavor = "multi_thread")]
9//! # async fn main() {
10//! use influxive_core::Metric;
11//! use influxive_child_svc::*;
12//!
13//! let tmp = tempfile::tempdir().unwrap();
14//!
15//! let influxive = InfluxiveChildSvc::new(
16//!     InfluxiveChildSvcConfig::default()
17//!         .with_database_path(Some(tmp.path().to_owned())),
18//! ).await.unwrap();
19//!
20//! influxive.write_metric(
21//!     Metric::new(
22//!         std::time::SystemTime::now(),
23//!         "my.metric",
24//!     )
25//!     .with_field("value", 3.14)
26//!     .with_tag("tag", "test-tag")
27//! );
28//! # }
29//! ```
30
31use std::io::Result;
32
33#[cfg(feature = "download_binaries")]
34mod download_binaries;
35
36use influxive_core::*;
37use influxive_writer::*;
38
39pub use influxive_writer::InfluxiveWriterConfig;
40
41macro_rules! cmd_output {
42    ($cmd:expr $(,$arg:expr)*) => {async {
43        let mut proc = tokio::process::Command::new($cmd);
44        proc.stdin(std::process::Stdio::null());
45        proc.kill_on_drop(true);
46        $(
47            proc.arg($arg);
48        )*
49        let output = proc.output().await?;
50        let err = String::from_utf8_lossy(&output.stderr);
51        if !err.is_empty() {
52            Err(err_other(err.to_string()))
53        } else {
54            Ok(String::from_utf8_lossy(&output.stdout).to_string())
55        }
56    }.await}
57}
58
59/// Configure the child process.
60#[derive(Debug)]
61#[non_exhaustive]
62pub struct InfluxiveChildSvcConfig {
63    #[cfg(feature = "download_binaries")]
64    /// If true, will fall back to downloading influx release binaries.
65    /// Defaults to `true`.
66    pub download_binaries: bool,
67
68    /// Path to influxd binary. If None, will try the path.
69    /// Defaults to `None`.
70    pub influxd_path: Option<std::path::PathBuf>,
71
72    /// Path to influx cli binary. If None, will try the path.
73    /// Defaults to `None`.
74    pub influx_path: Option<std::path::PathBuf>,
75
76    /// Path to influx database files and config directory. If None, will
77    /// use `./influxive`.
78    /// Defaults to `None`.
79    pub database_path: Option<std::path::PathBuf>,
80
81    /// Influx initial username.
82    /// Defaults to `influxive`.
83    pub user: String,
84
85    /// Influx initial password.
86    /// Defaults to `influxive`.
87    pub pass: String,
88
89    /// Influx initial organization name.
90    /// Defaults to `influxive`.
91    pub org: String,
92
93    /// Influx initial bucket name.
94    /// Defaults to `influxive`.
95    pub bucket: String,
96
97    /// Retention timespan.
98    /// Defaults to `72h`.
99    pub retention: String,
100
101    /// The influxive metric writer configuration.
102    pub metric_write: InfluxiveWriterConfig,
103}
104
105impl Default for InfluxiveChildSvcConfig {
106    fn default() -> Self {
107        Self {
108            download_binaries: true,
109            influxd_path: None,
110            influx_path: None,
111            database_path: None,
112            user: "influxive".to_string(),
113            pass: "influxive".to_string(),
114            org: "influxive".to_string(),
115            bucket: "influxive".to_string(),
116            retention: "72h".to_string(),
117            metric_write: InfluxiveWriterConfig::default(),
118        }
119    }
120}
121
122impl InfluxiveChildSvcConfig {
123    /// Apply [InfluxiveChildSvcConfig::download_binaries].
124    pub fn with_download_binaries(mut self, download_binaries: bool) -> Self {
125        self.download_binaries = download_binaries;
126        self
127    }
128
129    /// Apply [InfluxiveChildSvcConfig::influxd_path].
130    pub fn with_influxd_path(
131        mut self,
132        influxd_path: Option<std::path::PathBuf>,
133    ) -> Self {
134        self.influxd_path = influxd_path;
135        self
136    }
137
138    /// Apply [InfluxiveChildSvcConfig::influx_path].
139    pub fn with_influx_path(
140        mut self,
141        influx_path: Option<std::path::PathBuf>,
142    ) -> Self {
143        self.influx_path = influx_path;
144        self
145    }
146
147    /// Apply [InfluxiveChildSvcConfig::database_path].
148    pub fn with_database_path(
149        mut self,
150        database_path: Option<std::path::PathBuf>,
151    ) -> Self {
152        self.database_path = database_path;
153        self
154    }
155
156    /// Apply [InfluxiveChildSvcConfig::user].
157    pub fn with_user(mut self, user: String) -> Self {
158        self.user = user;
159        self
160    }
161
162    /// Apply [InfluxiveChildSvcConfig::pass].
163    pub fn with_pass(mut self, pass: String) -> Self {
164        self.pass = pass;
165        self
166    }
167
168    /// Apply [InfluxiveChildSvcConfig::org].
169    pub fn with_org(mut self, org: String) -> Self {
170        self.org = org;
171        self
172    }
173
174    /// Apply [InfluxiveChildSvcConfig::bucket].
175    pub fn with_bucket(mut self, bucket: String) -> Self {
176        self.bucket = bucket;
177        self
178    }
179
180    /// Apply [InfluxiveChildSvcConfig::retention].
181    pub fn with_retention(mut self, retention: String) -> Self {
182        self.retention = retention;
183        self
184    }
185
186    /// Apply [InfluxiveChildSvcConfig::metric_write].
187    pub fn with_metric_write(
188        mut self,
189        metric_write: InfluxiveWriterConfig,
190    ) -> Self {
191        self.metric_write = metric_write;
192        self
193    }
194}
195
196/// A running child-process instance of influxd.
197/// Command and control functions are provided through the influx cli tool.
198/// Metric writing is provided through the http line protocol.
199pub struct InfluxiveChildSvc {
200    config: InfluxiveChildSvcConfig,
201    host: String,
202    token: String,
203    child: std::sync::Mutex<Option<tokio::process::Child>>,
204    influx_path: std::path::PathBuf,
205    writer: InfluxiveWriter,
206}
207
208impl InfluxiveChildSvc {
209    /// Spawn a new influxd child process.
210    pub async fn new(config: InfluxiveChildSvcConfig) -> Result<Self> {
211        let db_path = config.database_path.clone().unwrap_or_else(|| {
212            let mut db_path = std::path::PathBuf::from(".");
213            db_path.push("influxive");
214            db_path
215        });
216
217        tokio::fs::create_dir_all(&db_path).await?;
218
219        let influxd_path = validate_influx(&db_path, &config, false).await?;
220
221        let influx_path = validate_influx(&db_path, &config, true).await?;
222
223        let (child, port) = spawn_influxd(&db_path, &influxd_path).await?;
224
225        let host = format!("http://127.0.0.1:{port}");
226
227        let mut configs_path = std::path::PathBuf::from(&db_path);
228        configs_path.push("configs");
229
230        if let Err(err) = cmd_output!(
231            &influx_path,
232            "setup",
233            "--json",
234            "--configs-path",
235            &configs_path,
236            "--host",
237            &host,
238            "--username",
239            &config.user,
240            "--password",
241            &config.pass,
242            "--org",
243            &config.org,
244            "--bucket",
245            &config.bucket,
246            "--retention",
247            &config.retention,
248            "--force"
249        ) {
250            let repr = format!("{err:?}");
251            if !repr.contains("Error: instance has already been set up") {
252                return Err(err);
253            }
254        }
255
256        let token = tokio::fs::read(&configs_path).await?;
257        let token = String::from_utf8_lossy(&token);
258        let mut token = token.split("token = \"");
259        token.next().unwrap();
260        let token = token.next().unwrap();
261        let mut token = token.split('\"');
262        let token = token.next().unwrap().to_string();
263
264        let writer = InfluxiveWriter::with_token_auth(
265            config.metric_write.clone(),
266            &host,
267            &config.bucket,
268            &token,
269        );
270
271        let bucket = config.bucket.clone();
272
273        let this = Self {
274            config,
275            host,
276            token,
277            child: std::sync::Mutex::new(Some(child)),
278            influx_path,
279            writer,
280        };
281
282        let mut millis = 10;
283
284        for _ in 0..10 {
285            // this ensures the db / bucket is created + ready to go
286            this.write_metric(
287                Metric::new(std::time::SystemTime::now(), "influxive.start")
288                    .with_field("value", true),
289            );
290
291            if let Ok(result) = this
292                .query(format!(
293                    r#"from(bucket: "{bucket}")
294|> range(start: -15m, stop: now())
295|> filter(fn: (r) => r["_measurement"] == "influxive.start")
296|> filter(fn: (r) => r["_field"] == "value")"#,
297                ))
298                .await
299            {
300                if result.split('\n').count() >= 3 {
301                    return Ok(this);
302                }
303            }
304
305            tokio::time::sleep(std::time::Duration::from_millis(millis)).await;
306            millis *= 2;
307        }
308
309        Err(err_other("Unable to start influxd"))
310    }
311
312    /// Shut down the child process. Further calls to it should error.
313    pub fn shutdown(&self) {
314        drop(self.child.lock().unwrap().take());
315    }
316
317    /// Get the config this instance was created with.
318    pub fn get_config(&self) -> &InfluxiveChildSvcConfig {
319        &self.config
320    }
321
322    /// Get the host url of this running influxd instance.
323    pub fn get_host(&self) -> &str {
324        &self.host
325    }
326
327    /// Get the operator token of this running influxd instance.
328    pub fn get_token(&self) -> &str {
329        &self.token
330    }
331
332    /// "Ping" the running InfluxDB instance, returning the result.
333    pub async fn ping(&self) -> Result<()> {
334        cmd_output!(&self.influx_path, "ping", "--host", &self.host)?;
335        Ok(())
336    }
337
338    /// Run a query against the running InfluxDB instance.
339    /// Note, if you are writing metrics, prefer the 'write_metric' api
340    /// as that will be more efficient.
341    pub async fn query<Q: Into<StringType>>(
342        &self,
343        flux_query: Q,
344    ) -> Result<String> {
345        cmd_output!(
346            &self.influx_path,
347            "query",
348            "--raw",
349            "--org",
350            &self.config.org,
351            "--host",
352            &self.host,
353            "--token",
354            &self.token,
355            flux_query.into().into_string()
356        )
357    }
358
359    /// List the existing dashboard data in the running InfluxDB instance.
360    pub async fn list_dashboards(&self) -> Result<String> {
361        cmd_output!(
362            &self.influx_path,
363            "dashboards",
364            "--org",
365            &self.config.org,
366            "--host",
367            &self.host,
368            "--token",
369            &self.token,
370            "--json"
371        )
372    }
373
374    /// Apply a template to the running InfluxDB instance.
375    pub async fn apply(&self, template: &[u8]) -> Result<String> {
376        use tokio::io::AsyncWriteExt;
377
378        let (file, tmp) = tempfile::Builder::new()
379            .suffix(".json")
380            .tempfile()?
381            .into_parts();
382        let mut file = tokio::fs::File::from_std(file);
383
384        file.write_all(template).await?;
385        file.shutdown().await?;
386
387        let result = cmd_output!(
388            &self.influx_path,
389            "apply",
390            "--org",
391            &self.config.org,
392            "--host",
393            &self.host,
394            "--token",
395            &self.token,
396            "--json",
397            "--force",
398            "yes",
399            "--file",
400            &tmp
401        );
402
403        drop(file);
404
405        // Okay if this fails on windows
406        let _ = tmp.close();
407
408        result
409    }
410
411    /// Log a metric to the running InfluxDB instance.
412    /// Note, this function itself is an efficiency abstraction,
413    /// which will return quickly if there is space in the buffer.
414    /// The actual call to log the metrics will be made a configurable
415    /// timespan later to facilitate batching of metric writes.
416    pub fn write_metric(&self, metric: Metric) {
417        self.writer.write_metric(metric);
418    }
419}
420
421impl MetricWriter for InfluxiveChildSvc {
422    fn write_metric(&self, metric: Metric) {
423        InfluxiveChildSvc::write_metric(self, metric);
424    }
425}
426
427#[cfg(feature = "download_binaries")]
428async fn dl_influx(
429    _db_path: &std::path::Path,
430    is_cli: bool,
431    bin_path: &mut std::path::PathBuf,
432    err_list: &mut Vec<std::io::Error>,
433) -> Option<String> {
434    let spec = if is_cli {
435        &download_binaries::DL_CLI
436    } else {
437        &download_binaries::DL_DB
438    };
439
440    if let Some(spec) = &spec {
441        match spec.download(_db_path).await {
442            Ok(path) => {
443                *bin_path = path;
444                match cmd_output!(&bin_path, "version") {
445                    Ok(ver) => return Some(ver),
446                    Err(err) => {
447                        err_list.push(err_other(format!(
448                            "failed to run {bin_path:?}"
449                        )));
450                        err_list.push(err);
451                    }
452                }
453            }
454            Err(err) => {
455                err_list.push(err_other("failed to download"));
456                err_list.push(err);
457            }
458        }
459    } else {
460        err_list
461            .push(err_other("no download configured for this target os/arch"));
462    }
463
464    None
465}
466
467async fn validate_influx(
468    _db_path: &std::path::Path,
469    config: &InfluxiveChildSvcConfig,
470    is_cli: bool,
471) -> Result<std::path::PathBuf> {
472    let mut bin_path = if is_cli {
473        "influx".into()
474    } else {
475        "influxd".into()
476    };
477
478    if is_cli {
479        if let Some(path) = &config.influx_path {
480            bin_path = path.clone();
481        }
482    } else if let Some(path) = &config.influxd_path {
483        bin_path = path.clone();
484    };
485
486    let ver = match cmd_output!(&bin_path, "version") {
487        Ok(ver) => ver,
488        Err(err) => {
489            let mut err_list = Vec::new();
490            err_list.push(err_other(format!("failed to run {bin_path:?}")));
491            err_list.push(err);
492
493            #[cfg(feature = "download_binaries")]
494            {
495                if let Some(ver) =
496                    dl_influx(_db_path, is_cli, &mut bin_path, &mut err_list)
497                        .await
498                {
499                    ver
500                } else {
501                    return Err(err_other(format!("{err_list:?}",)));
502                }
503            }
504
505            #[cfg(not(feature = "download_binaries"))]
506            {
507                return Err(err_other(format!("{err_list:?}",)));
508            }
509        }
510    };
511
512    // alas, the cli prints out the unhelpful version "dev".
513    if is_cli && !ver.contains("build_date: 2023-04-28") {
514        return Err(err_other(format!("invalid build_date: {ver}")));
515    } else if !is_cli && !ver.contains("InfluxDB v2.7.6") {
516        return Err(err_other(format!("invalid version: {ver}")));
517    }
518
519    Ok(bin_path)
520}
521
522async fn spawn_influxd(
523    db_path: &std::path::Path,
524    influxd_path: &std::path::Path,
525) -> Result<(tokio::process::Child, u16)> {
526    use tokio::io::AsyncBufReadExt;
527
528    let (s, r) = tokio::sync::oneshot::channel();
529
530    let mut s = Some(s);
531
532    let mut engine_path = std::path::PathBuf::from(db_path);
533    engine_path.push("engine");
534    let mut bolt_path = std::path::PathBuf::from(db_path);
535    bolt_path.push("influxd.bolt");
536
537    let mut proc = tokio::process::Command::new(influxd_path);
538    proc.kill_on_drop(true);
539    proc.arg("--engine-path").arg(engine_path);
540    proc.arg("--bolt-path").arg(bolt_path);
541    proc.arg("--http-bind-address").arg("127.0.0.1:0");
542    proc.arg("--metrics-disabled");
543    proc.arg("--reporting-disabled");
544    proc.stdout(std::process::Stdio::piped());
545
546    let proc_err = format!("{proc:?}");
547
548    let mut child = proc
549        .spawn()
550        .map_err(|err| err_other(format!("{proc_err}: {err:?}")))?;
551
552    let stdout = child.stdout.take().unwrap();
553    let mut reader = tokio::io::BufReader::new(stdout).lines();
554
555    tokio::task::spawn(async move {
556        while let Some(line) = reader.next_line().await.expect("got line") {
557            tracing::trace!(?line, "influxd stdout");
558            if line.contains("msg=Listening")
559                && line.contains("service=tcp-listener")
560                && line.contains("transport=http")
561            {
562                let mut iter = line.split(" port=");
563                iter.next().unwrap();
564                let item = iter.next().unwrap();
565                let port: u16 = item.parse().unwrap();
566                if let Some(s) = s.take() {
567                    let _ = s.send(port);
568                }
569            }
570        }
571    });
572
573    let port = r.await.map_err(|_| err_other("Failed to scrape port"))?;
574
575    Ok((child, port))
576}
577
578#[cfg(test)]
579mod test;