auxon_sdk/plugin_utils/
mod.rs

1//! Various helpful utilities for writing modality-reflector plugins.
2
3pub mod config;
4pub mod serde;
5
6#[cfg(feature = "modality")]
7pub mod ingest;
8
9#[cfg(feature = "deviant")]
10pub mod mutation;
11
12use crate::api::types::{AttrKey, AttrVal};
13use crate::auth_token::{self, AuthToken, MODALITY_AUTH_TOKEN_ENV_VAR};
14use crate::reflector_config::{self, AttrKeyEqValuePair, ConfigLoadError, TopLevelIngest};
15use clap::Parser;
16use std::collections::BTreeMap;
17use std::future::Future;
18use std::path::{Path, PathBuf};
19use std::pin::Pin;
20use std::str::FromStr;
21use url::Url;
22
23pub const MODALITY_STORAGE_SERVICE_PORT_DEFAULT: u16 = 14182;
24
25pub const CLI_TEMPLATE: &str = "\
26            {about}\n\n\
27            USAGE:\n    {usage}\n\
28            \n\
29            {all-args}\
30        ";
31
32/// Handles boilerplate setup for:
33/// * tracing_subscriber configuration
34/// * Signal pipe fixup
35/// * Printing out errors
36/// * Exit code management
37///
38/// The server constructor function consumes config, custom cli args, and a shutdown signal future,
39/// then returns an indefinitely-running future that represents the server.
40///
41/// This function blocks waiting for either the constructed server future to finish
42/// or a CTRL+C style signal.
43///
44/// Returns the process's desired exit code.
45#[deprecated]
46pub fn server_main<Opts, ServerFuture, ServerConstructor>(
47    server_constructor: ServerConstructor,
48) -> i32
49where
50    Opts: Parser,
51    Opts: BearingConfigFilePath,
52    ServerFuture: Future<Output = Result<(), Box<dyn std::error::Error + 'static>>> + 'static,
53    ServerConstructor: FnOnce(
54        reflector_config::Config,
55        AuthToken,
56        Opts,
57        Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
58    ) -> ServerFuture,
59{
60    let _ = reset_signal_pipe_handler();
61    let opts = match Opts::try_parse_from(std::env::args()) {
62        Ok(opts) => opts,
63        Err(e)
64            if e.kind() == clap::error::ErrorKind::DisplayHelp
65                || e.kind() == clap::error::ErrorKind::DisplayVersion =>
66        {
67            // Need to print to stdout for these command variants in support of manual generation
68            if let Err(e) = e.print() {
69                error_print(&e);
70                return exitcode::SOFTWARE;
71            }
72            return exitcode::OK;
73        }
74        Err(e) => {
75            error_print(&e);
76            return exitcode::SOFTWARE;
77        }
78    };
79
80    let config = if let Some(config_file) = opts.config_file_path() {
81        match reflector_config::try_from_file(config_file) {
82            Ok(c) => c,
83            Err(config_load_error) => {
84                // N.B. tracing subscriber is not configured yet, this may disappear
85                tracing::error!(
86                    err = &config_load_error as &dyn std::error::Error,
87                    "Failed to load config file provided by command line args, exiting."
88                );
89                let exit_code = match &config_load_error {
90                    ConfigLoadError::Io(_) => exitcode::IOERR,
91                    _ => exitcode::CONFIG,
92                };
93                error_print(&config_load_error);
94                return exit_code;
95            }
96        }
97    } else if let Ok(config_file) = std::env::var(reflector_config::CONFIG_ENV_VAR) {
98        match reflector_config::try_from_file(&PathBuf::from(config_file)) {
99            Ok(c) => c,
100            Err(config_load_error) => {
101                // N.B. tracing subscriber is not configured yet, this may disappear
102                tracing::error!(
103                    err = &config_load_error as &dyn std::error::Error,
104                    "Failed to load config file provided by environment variable, exiting."
105                );
106                let exit_code = match &config_load_error {
107                    ConfigLoadError::Io(_) => exitcode::IOERR,
108                    _ => exitcode::CONFIG,
109                };
110                error_print(&config_load_error);
111                return exit_code;
112            }
113        }
114    } else {
115        // N.B. tracing subscriber is not configured yet, this may disappear
116        tracing::warn!("No config file specified, using default configuration.");
117        reflector_config::Config::default()
118    };
119
120    // setup custom tracer including ModalityLayer
121    #[cfg(feature = "modality_tracing")]
122    let maybe_modality = {
123        let mut modality_tracing_options = crate::tracing::Options::default();
124        let maybe_preferred_ingest_parent_socket = if let Some(ingest_parent_url) = config
125            .ingest
126            .as_ref()
127            .and_then(|ing| ing.protocol_parent_url.as_ref())
128        {
129            ingest_parent_url
130                .socket_addrs(|| Some(14182))
131                .ok()
132                .and_then(|sockets| sockets.into_iter().next())
133        } else {
134            None
135        };
136        if let Some(socket) = maybe_preferred_ingest_parent_socket {
137            modality_tracing_options = modality_tracing_options.with_server_address(socket);
138        }
139
140        use tracing_subscriber::layer::{Layer, SubscriberExt};
141
142        use tracing_subscriber::filter::{EnvFilter, LevelFilter};
143        let (disp, maybe_modality_ingest_handle) =
144            match crate::tracing::blocking::ModalityLayer::init_with_options(
145                modality_tracing_options,
146            ) {
147                Ok((modality_layer, modality_ingest_handle)) => {
148                    // Trace output through both the stdout formatter and modality's ingest pipeline
149                    (
150                        tracing::Dispatch::new(
151                            tracing_subscriber::Registry::default()
152                                .with(
153                                    modality_layer.with_filter(
154                                        EnvFilter::builder()
155                                            .with_default_directive(LevelFilter::INFO.into())
156                                            .from_env_lossy(),
157                                    ),
158                                )
159                                .with(
160                                    tracing_subscriber::fmt::Layer::default().with_filter(
161                                        EnvFilter::builder()
162                                            .with_default_directive(LevelFilter::INFO.into())
163                                            .from_env_lossy(),
164                                    ),
165                                ),
166                        ),
167                        Some(modality_ingest_handle),
168                    )
169                }
170                Err(modality_init_err) => {
171                    eprintln!("Modality tracing layer initialization error.");
172                    error_print(&modality_init_err);
173                    // Only do trace output through the stdout formatter
174                    (
175                        tracing::Dispatch::new(
176                            tracing_subscriber::Registry::default().with(
177                                tracing_subscriber::fmt::Layer::default().with_filter(
178                                    EnvFilter::builder()
179                                        .with_default_directive(LevelFilter::INFO.into())
180                                        .from_env_lossy(),
181                                ),
182                            ),
183                        ),
184                        None,
185                    )
186                }
187            };
188
189        tracing::dispatcher::set_global_default(disp).expect("set global tracer");
190
191        maybe_modality_ingest_handle
192    };
193
194    let auth_token = if let Ok(auth_token_env_str) = std::env::var(MODALITY_AUTH_TOKEN_ENV_VAR) {
195        match auth_token::decode_auth_token_hex(auth_token_env_str.as_str()) {
196            Ok(at) => at,
197            Err(auth_token_deserialization_err) => {
198                tracing::error!(
199                    err = &auth_token_deserialization_err as &dyn std::error::Error,
200                    "Failed to interpret auth token provide by environment variable, exiting."
201                );
202                error_print(&auth_token_deserialization_err);
203                return exitcode::CONFIG;
204            }
205        }
206    } else {
207        tracing::warn!(
208            "No auth token provided by environment variable {}, falling back to empty auth token",
209            MODALITY_AUTH_TOKEN_ENV_VAR
210        );
211        AuthToken::from(vec![])
212    };
213
214    let runtime = tokio::runtime::Builder::new_multi_thread()
215        .enable_all()
216        .build()
217        .expect("Could not construct tokio runtime");
218
219    let ctrlc = tokio::signal::ctrl_c();
220    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
221    let server_done = server_constructor(
222        config,
223        auth_token,
224        opts,
225        Box::pin(async {
226            let _ = shutdown_rx.await.map_err(|_recv_err| {
227                tracing::error!("Shutdown signal channel unexpectedly closed early.");
228            });
229        }),
230    );
231
232    let mut maybe_shutdown_tx = Some(shutdown_tx);
233    let out_exit_code = runtime.block_on(async {
234        tokio::select! {
235            signal_result = ctrlc => {
236                match signal_result {
237                    Ok(()) => {
238                        if let Some(shutdown_tx) = maybe_shutdown_tx.take() {
239                            let _ = shutdown_tx.send(());
240                        }
241                        tracing::info!("Received ctrl+c, exiting.");
242                        exitcode::OK
243                    },
244                    Err(io_err) => {
245                        if let Some(shutdown_tx) = maybe_shutdown_tx.take() {
246                            let _ = shutdown_tx.send(());
247                        }
248                        error_print(&io_err);
249                        tracing::error!("Failed to install ctrl+c handler, exiting.");
250                        exitcode::IOERR
251                    }
252                }
253            }
254            server_result = server_done => {
255                match server_result {
256                    Ok(()) => {
257                        tracing::info!("Done.");
258                        exitcode::OK
259                    },
260                    Err(e) => {
261                        tracing::error!("Server crashed early, exiting.");
262                        error_print(e.as_ref());
263                        exitcode::SOFTWARE
264                    }
265                }
266            }
267        }
268    });
269    // Drop the runtime a little ahead of function exit
270    // in order to ensure that the shutdown_tx side of
271    // the shutdown signal channel does not drop first.
272    std::mem::drop(runtime);
273    #[cfg(feature = "modality_tracing")]
274    {
275        if let Some(modality_ingest_handle) = maybe_modality {
276            modality_ingest_handle.finish();
277        }
278    }
279    let _maybe_shutdown_tx = maybe_shutdown_tx;
280    out_exit_code
281}
282
283pub(crate) fn error_print(err: &dyn std::error::Error) {
284    fn print_err_node(err: &dyn std::error::Error) {
285        eprintln!("{err}");
286    }
287
288    print_err_node(err);
289
290    let mut cause = err.source();
291    while let Some(err) = cause {
292        eprint!("Caused by: ");
293        print_err_node(err);
294        cause = err.source();
295    }
296}
297
298// Used to prevent panics on broken pipes.
299// See:
300//   https://github.com/rust-lang/rust/issues/46016#issuecomment-605624865
301fn reset_signal_pipe_handler() -> Result<(), Box<dyn std::error::Error>> {
302    #[cfg(target_family = "unix")]
303    {
304        use nix::sys::signal;
305
306        unsafe {
307            signal::signal(signal::Signal::SIGPIPE, signal::SigHandler::SigDfl)?;
308        }
309    }
310
311    Ok(())
312}
313
314pub trait BearingConfigFilePath {
315    fn config_file_path(&self) -> Option<&Path>;
316}
317
318pub fn merge_ingest_protocol_parent_url(
319    cli_provided: Option<&Url>,
320    cfg: &reflector_config::Config,
321) -> Url {
322    if let Some(parent_url) = cli_provided {
323        parent_url.clone()
324    } else if let Some(TopLevelIngest {
325        protocol_parent_url: Some(parent_url),
326        ..
327    }) = &cfg.ingest
328    {
329        parent_url.clone()
330    } else {
331        let fallback = Url::from_str("modality-ingest://127.0.0.1").unwrap();
332        tracing::warn!(
333            "Plugin falling back to an ingest protocol parent URL of {}",
334            &fallback
335        );
336        fallback
337    }
338}
339
340#[derive(Debug, thiserror::Error)]
341pub enum ProtocolParentError {
342    #[error("Failed to provide an ingest protocol parent URL.")]
343    IngestProtocolParentUrlMissing,
344
345    #[error("Failed to resolve ingest protocol parent URL to an address '{0}'.")]
346    IngestProtocolParentAddressResolution(Url),
347}
348
349pub fn merge_timeline_attrs(
350    cli_provided_attrs: &[AttrKeyEqValuePair],
351    cfg: &reflector_config::Config,
352) -> BTreeMap<AttrKey, AttrVal> {
353    // Merge additional and override timeline attrs from cfg and opts
354    // TODO deal with conflicting reserved attrs in #2098
355    let mut timeline_attrs = BTreeMap::new();
356
357    fn ensure_timeline_prefix(k: AttrKey) -> AttrKey {
358        if k.as_ref().starts_with("timeline.") {
359            k
360        } else if k.as_ref().starts_with('.') {
361            AttrKey::from("timeline".to_owned() + k.as_ref())
362        } else {
363            AttrKey::from("timeline.".to_owned() + k.as_ref())
364        }
365    }
366    if let Some(tli) = &cfg.ingest {
367        for kvp in tli
368            .timeline_attributes
369            .additional_timeline_attributes
370            .iter()
371            .cloned()
372        {
373            let _ = timeline_attrs.insert(ensure_timeline_prefix(kvp.0), kvp.1);
374        }
375        for kvp in tli
376            .timeline_attributes
377            .override_timeline_attributes
378            .iter()
379            .cloned()
380        {
381            let _ = timeline_attrs.insert(ensure_timeline_prefix(kvp.0), kvp.1);
382        }
383    }
384    // The CLI-provided attrs will take precedence over config
385    for kvp in cli_provided_attrs.iter().cloned() {
386        let _ = timeline_attrs.insert(ensure_timeline_prefix(kvp.0), kvp.1);
387    }
388    timeline_attrs
389}
390
391/// Initialize the `tracing` crate with `tracing_subscriber::EnvFilter`. If
392/// `RUST_LOG` is not set, default to setting the current module to 'info'.
393/// Will panic if the tracing subscriber cannot be initialized.
394#[macro_export]
395macro_rules! init_tracing {
396    () => {
397        let builder = ::tracing_subscriber::fmt::Subscriber::builder();
398        let env_filter = ::std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV)
399            .map(::tracing_subscriber::EnvFilter::new)
400            .unwrap_or_else(|_| {
401                ::tracing_subscriber::EnvFilter::new(format!(
402                    "{}={}",
403                    env!("CARGO_PKG_NAME").replace('-', "_"),
404                    ::tracing::Level::INFO
405                ))
406            });
407        let builder = builder.with_env_filter(env_filter);
408        let subscriber = builder.finish();
409        use ::tracing_subscriber::util::SubscriberInitExt;
410        subscriber
411            .try_init()
412            .expect("Unable to initialize tracing subscriber");
413    };
414    ($env_filter:expr) => {
415        let builder = ::tracing_subscriber::fmt::Subscriber::builder();
416        let env_filter = ::std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV)
417            .map(::tracing_subscriber::EnvFilter::new)
418            .unwrap_or_else(|_| $env_filter);
419        let builder = builder.with_env_filter(env_filter);
420        let subscriber = builder.finish();
421        use ::tracing_subscriber::util::SubscriberInitExt;
422        subscriber
423            .try_init()
424            .expect("Unable to initialize tracing subscriber");
425    };
426}