auxon_sdk/plugin_utils/
mod.rs1pub mod config;
4pub mod serde;
5
6#[cfg(feature = "modality")]
7pub mod ingest;
8
9#[cfg(feature = "deviant")]
10pub mod mutation;
11
12use crate::api::types::{AttrKey, AttrVal};
13use crate::auth_token::{self, AuthToken, MODALITY_AUTH_TOKEN_ENV_VAR};
14use crate::reflector_config::{self, AttrKeyEqValuePair, ConfigLoadError, TopLevelIngest};
15use clap::Parser;
16use std::collections::BTreeMap;
17use std::future::Future;
18use std::path::{Path, PathBuf};
19use std::pin::Pin;
20use std::str::FromStr;
21use url::Url;
22
23pub const MODALITY_STORAGE_SERVICE_PORT_DEFAULT: u16 = 14182;
24
25pub const CLI_TEMPLATE: &str = "\
26 {about}\n\n\
27 USAGE:\n {usage}\n\
28 \n\
29 {all-args}\
30 ";
31
32#[deprecated]
46pub fn server_main<Opts, ServerFuture, ServerConstructor>(
47 server_constructor: ServerConstructor,
48) -> i32
49where
50 Opts: Parser,
51 Opts: BearingConfigFilePath,
52 ServerFuture: Future<Output = Result<(), Box<dyn std::error::Error + 'static>>> + 'static,
53 ServerConstructor: FnOnce(
54 reflector_config::Config,
55 AuthToken,
56 Opts,
57 Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
58 ) -> ServerFuture,
59{
60 let _ = reset_signal_pipe_handler();
61 let opts = match Opts::try_parse_from(std::env::args()) {
62 Ok(opts) => opts,
63 Err(e)
64 if e.kind() == clap::error::ErrorKind::DisplayHelp
65 || e.kind() == clap::error::ErrorKind::DisplayVersion =>
66 {
67 if let Err(e) = e.print() {
69 error_print(&e);
70 return exitcode::SOFTWARE;
71 }
72 return exitcode::OK;
73 }
74 Err(e) => {
75 error_print(&e);
76 return exitcode::SOFTWARE;
77 }
78 };
79
80 let config = if let Some(config_file) = opts.config_file_path() {
81 match reflector_config::try_from_file(config_file) {
82 Ok(c) => c,
83 Err(config_load_error) => {
84 tracing::error!(
86 err = &config_load_error as &dyn std::error::Error,
87 "Failed to load config file provided by command line args, exiting."
88 );
89 let exit_code = match &config_load_error {
90 ConfigLoadError::Io(_) => exitcode::IOERR,
91 _ => exitcode::CONFIG,
92 };
93 error_print(&config_load_error);
94 return exit_code;
95 }
96 }
97 } else if let Ok(config_file) = std::env::var(reflector_config::CONFIG_ENV_VAR) {
98 match reflector_config::try_from_file(&PathBuf::from(config_file)) {
99 Ok(c) => c,
100 Err(config_load_error) => {
101 tracing::error!(
103 err = &config_load_error as &dyn std::error::Error,
104 "Failed to load config file provided by environment variable, exiting."
105 );
106 let exit_code = match &config_load_error {
107 ConfigLoadError::Io(_) => exitcode::IOERR,
108 _ => exitcode::CONFIG,
109 };
110 error_print(&config_load_error);
111 return exit_code;
112 }
113 }
114 } else {
115 tracing::warn!("No config file specified, using default configuration.");
117 reflector_config::Config::default()
118 };
119
120 #[cfg(feature = "modality_tracing")]
122 let maybe_modality = {
123 let mut modality_tracing_options = crate::tracing::Options::default();
124 let maybe_preferred_ingest_parent_socket = if let Some(ingest_parent_url) = config
125 .ingest
126 .as_ref()
127 .and_then(|ing| ing.protocol_parent_url.as_ref())
128 {
129 ingest_parent_url
130 .socket_addrs(|| Some(14182))
131 .ok()
132 .and_then(|sockets| sockets.into_iter().next())
133 } else {
134 None
135 };
136 if let Some(socket) = maybe_preferred_ingest_parent_socket {
137 modality_tracing_options = modality_tracing_options.with_server_address(socket);
138 }
139
140 use tracing_subscriber::layer::{Layer, SubscriberExt};
141
142 use tracing_subscriber::filter::{EnvFilter, LevelFilter};
143 let (disp, maybe_modality_ingest_handle) =
144 match crate::tracing::blocking::ModalityLayer::init_with_options(
145 modality_tracing_options,
146 ) {
147 Ok((modality_layer, modality_ingest_handle)) => {
148 (
150 tracing::Dispatch::new(
151 tracing_subscriber::Registry::default()
152 .with(
153 modality_layer.with_filter(
154 EnvFilter::builder()
155 .with_default_directive(LevelFilter::INFO.into())
156 .from_env_lossy(),
157 ),
158 )
159 .with(
160 tracing_subscriber::fmt::Layer::default().with_filter(
161 EnvFilter::builder()
162 .with_default_directive(LevelFilter::INFO.into())
163 .from_env_lossy(),
164 ),
165 ),
166 ),
167 Some(modality_ingest_handle),
168 )
169 }
170 Err(modality_init_err) => {
171 eprintln!("Modality tracing layer initialization error.");
172 error_print(&modality_init_err);
173 (
175 tracing::Dispatch::new(
176 tracing_subscriber::Registry::default().with(
177 tracing_subscriber::fmt::Layer::default().with_filter(
178 EnvFilter::builder()
179 .with_default_directive(LevelFilter::INFO.into())
180 .from_env_lossy(),
181 ),
182 ),
183 ),
184 None,
185 )
186 }
187 };
188
189 tracing::dispatcher::set_global_default(disp).expect("set global tracer");
190
191 maybe_modality_ingest_handle
192 };
193
194 let auth_token = if let Ok(auth_token_env_str) = std::env::var(MODALITY_AUTH_TOKEN_ENV_VAR) {
195 match auth_token::decode_auth_token_hex(auth_token_env_str.as_str()) {
196 Ok(at) => at,
197 Err(auth_token_deserialization_err) => {
198 tracing::error!(
199 err = &auth_token_deserialization_err as &dyn std::error::Error,
200 "Failed to interpret auth token provide by environment variable, exiting."
201 );
202 error_print(&auth_token_deserialization_err);
203 return exitcode::CONFIG;
204 }
205 }
206 } else {
207 tracing::warn!(
208 "No auth token provided by environment variable {}, falling back to empty auth token",
209 MODALITY_AUTH_TOKEN_ENV_VAR
210 );
211 AuthToken::from(vec![])
212 };
213
214 let runtime = tokio::runtime::Builder::new_multi_thread()
215 .enable_all()
216 .build()
217 .expect("Could not construct tokio runtime");
218
219 let ctrlc = tokio::signal::ctrl_c();
220 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
221 let server_done = server_constructor(
222 config,
223 auth_token,
224 opts,
225 Box::pin(async {
226 let _ = shutdown_rx.await.map_err(|_recv_err| {
227 tracing::error!("Shutdown signal channel unexpectedly closed early.");
228 });
229 }),
230 );
231
232 let mut maybe_shutdown_tx = Some(shutdown_tx);
233 let out_exit_code = runtime.block_on(async {
234 tokio::select! {
235 signal_result = ctrlc => {
236 match signal_result {
237 Ok(()) => {
238 if let Some(shutdown_tx) = maybe_shutdown_tx.take() {
239 let _ = shutdown_tx.send(());
240 }
241 tracing::info!("Received ctrl+c, exiting.");
242 exitcode::OK
243 },
244 Err(io_err) => {
245 if let Some(shutdown_tx) = maybe_shutdown_tx.take() {
246 let _ = shutdown_tx.send(());
247 }
248 error_print(&io_err);
249 tracing::error!("Failed to install ctrl+c handler, exiting.");
250 exitcode::IOERR
251 }
252 }
253 }
254 server_result = server_done => {
255 match server_result {
256 Ok(()) => {
257 tracing::info!("Done.");
258 exitcode::OK
259 },
260 Err(e) => {
261 tracing::error!("Server crashed early, exiting.");
262 error_print(e.as_ref());
263 exitcode::SOFTWARE
264 }
265 }
266 }
267 }
268 });
269 std::mem::drop(runtime);
273 #[cfg(feature = "modality_tracing")]
274 {
275 if let Some(modality_ingest_handle) = maybe_modality {
276 modality_ingest_handle.finish();
277 }
278 }
279 let _maybe_shutdown_tx = maybe_shutdown_tx;
280 out_exit_code
281}
282
283pub(crate) fn error_print(err: &dyn std::error::Error) {
284 fn print_err_node(err: &dyn std::error::Error) {
285 eprintln!("{err}");
286 }
287
288 print_err_node(err);
289
290 let mut cause = err.source();
291 while let Some(err) = cause {
292 eprint!("Caused by: ");
293 print_err_node(err);
294 cause = err.source();
295 }
296}
297
298fn reset_signal_pipe_handler() -> Result<(), Box<dyn std::error::Error>> {
302 #[cfg(target_family = "unix")]
303 {
304 use nix::sys::signal;
305
306 unsafe {
307 signal::signal(signal::Signal::SIGPIPE, signal::SigHandler::SigDfl)?;
308 }
309 }
310
311 Ok(())
312}
313
314pub trait BearingConfigFilePath {
315 fn config_file_path(&self) -> Option<&Path>;
316}
317
318pub fn merge_ingest_protocol_parent_url(
319 cli_provided: Option<&Url>,
320 cfg: &reflector_config::Config,
321) -> Url {
322 if let Some(parent_url) = cli_provided {
323 parent_url.clone()
324 } else if let Some(TopLevelIngest {
325 protocol_parent_url: Some(parent_url),
326 ..
327 }) = &cfg.ingest
328 {
329 parent_url.clone()
330 } else {
331 let fallback = Url::from_str("modality-ingest://127.0.0.1").unwrap();
332 tracing::warn!(
333 "Plugin falling back to an ingest protocol parent URL of {}",
334 &fallback
335 );
336 fallback
337 }
338}
339
340#[derive(Debug, thiserror::Error)]
341pub enum ProtocolParentError {
342 #[error("Failed to provide an ingest protocol parent URL.")]
343 IngestProtocolParentUrlMissing,
344
345 #[error("Failed to resolve ingest protocol parent URL to an address '{0}'.")]
346 IngestProtocolParentAddressResolution(Url),
347}
348
349pub fn merge_timeline_attrs(
350 cli_provided_attrs: &[AttrKeyEqValuePair],
351 cfg: &reflector_config::Config,
352) -> BTreeMap<AttrKey, AttrVal> {
353 let mut timeline_attrs = BTreeMap::new();
356
357 fn ensure_timeline_prefix(k: AttrKey) -> AttrKey {
358 if k.as_ref().starts_with("timeline.") {
359 k
360 } else if k.as_ref().starts_with('.') {
361 AttrKey::from("timeline".to_owned() + k.as_ref())
362 } else {
363 AttrKey::from("timeline.".to_owned() + k.as_ref())
364 }
365 }
366 if let Some(tli) = &cfg.ingest {
367 for kvp in tli
368 .timeline_attributes
369 .additional_timeline_attributes
370 .iter()
371 .cloned()
372 {
373 let _ = timeline_attrs.insert(ensure_timeline_prefix(kvp.0), kvp.1);
374 }
375 for kvp in tli
376 .timeline_attributes
377 .override_timeline_attributes
378 .iter()
379 .cloned()
380 {
381 let _ = timeline_attrs.insert(ensure_timeline_prefix(kvp.0), kvp.1);
382 }
383 }
384 for kvp in cli_provided_attrs.iter().cloned() {
386 let _ = timeline_attrs.insert(ensure_timeline_prefix(kvp.0), kvp.1);
387 }
388 timeline_attrs
389}
390
391#[macro_export]
395macro_rules! init_tracing {
396 () => {
397 let builder = ::tracing_subscriber::fmt::Subscriber::builder();
398 let env_filter = ::std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV)
399 .map(::tracing_subscriber::EnvFilter::new)
400 .unwrap_or_else(|_| {
401 ::tracing_subscriber::EnvFilter::new(format!(
402 "{}={}",
403 env!("CARGO_PKG_NAME").replace('-', "_"),
404 ::tracing::Level::INFO
405 ))
406 });
407 let builder = builder.with_env_filter(env_filter);
408 let subscriber = builder.finish();
409 use ::tracing_subscriber::util::SubscriberInitExt;
410 subscriber
411 .try_init()
412 .expect("Unable to initialize tracing subscriber");
413 };
414 ($env_filter:expr) => {
415 let builder = ::tracing_subscriber::fmt::Subscriber::builder();
416 let env_filter = ::std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV)
417 .map(::tracing_subscriber::EnvFilter::new)
418 .unwrap_or_else(|_| $env_filter);
419 let builder = builder.with_env_filter(env_filter);
420 let subscriber = builder.finish();
421 use ::tracing_subscriber::util::SubscriberInitExt;
422 subscriber
423 .try_init()
424 .expect("Unable to initialize tracing subscriber");
425 };
426}