Skip to main content

nautilus_live/
plugin.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live-node plug-in support.
17//!
18//! The public exports mirror `nautilus_plugin::bridge` so existing
19//! `nautilus_live::plugin::*` imports keep working. The crate-local runtime
20//! pieces below load configured plug-ins, build node adapters, and manage
21//! plug-in controller lifecycle state.
22
23use std::path::Path;
24
25use ahash::AHashSet;
26use anyhow::Context;
27use aws_lc_rs::digest;
28use nautilus_core::hex;
29use nautilus_model::identifiers::{ActorId, StrategyId};
30pub use nautilus_plugin::bridge::*;
31use nautilus_plugin::loader::PluginLoader;
32use nautilus_trading::strategy::StrategyConfig;
33
34use crate::config::PluginConfig;
35
36#[derive(Debug, Default)]
37pub(crate) struct NodePlugins {
38    loader: Option<PluginLoader>,
39    controllers: Vec<PluginControllerAdapter>,
40    controllers_started: bool,
41}
42
43impl NodePlugins {
44    pub(crate) fn set_loader(&mut self, loader: PluginLoader) {
45        self.loader = Some(loader);
46    }
47
48    pub(crate) fn push_controller(&mut self, controller: PluginControllerAdapter) {
49        self.controllers.push(controller);
50    }
51
52    pub(crate) fn load_plugin(
53        &mut self,
54        config: &PluginConfig,
55    ) -> anyhow::Result<NodePluginAdapter> {
56        verify_plugin_sha256(config)?;
57
58        let entry = {
59            let loader = self.loader.get_or_insert_with(plugin_loader);
60            let path = Path::new(&config.path);
61            let already_loaded = loader.loaded().iter().any(|loaded| loaded.path() == path);
62
63            if !already_loaded {
64                let loaded = loader
65                    .load(&config.path)
66                    .with_context(|| format!("failed to load plug-in '{}'", config.path))?;
67                let registered = register_manifest_custom_data(loaded.validated_manifest())
68                    .with_context(|| {
69                        format!(
70                            "failed to register custom data from plug-in '{}'",
71                            loaded.path().display()
72                        )
73                    })?;
74
75                if registered > 0 {
76                    log::info!(
77                        "Registered {registered} custom data type(s) from plug-in {}",
78                        loaded.path().display()
79                    );
80                }
81            }
82
83            let loaded = loader
84                .loaded()
85                .iter()
86                .find(|loaded| loaded.path() == path)
87                .ok_or_else(|| anyhow::anyhow!("plug-in '{}' was not loaded", config.path))?;
88            configured_entry(loaded.validated_manifest(), &config.path, &config.type_name)?
89        };
90
91        configured_plugin_adapter(entry, config)
92    }
93
94    pub(crate) fn start_controllers(&mut self) -> anyhow::Result<()> {
95        if self.controllers_started {
96            return Ok(());
97        }
98
99        for index in 0..self.controllers.len() {
100            let result = {
101                let controller = &mut self.controllers[index];
102                controller.on_start().with_context(|| {
103                    format!(
104                        "failed to start plug-in controller '{}' from plug-in '{}'",
105                        controller.type_name(),
106                        controller.plugin_name()
107                    )
108                })
109            };
110
111            if let Err(start_err) = result {
112                for controller in self.controllers[..index].iter_mut().rev() {
113                    if let Err(stop_err) = controller.on_stop() {
114                        log::error!(
115                            "Failed to roll back plug-in controller '{}' from plug-in '{}': {stop_err}",
116                            controller.type_name(),
117                            controller.plugin_name()
118                        );
119                    }
120                }
121                return Err(start_err);
122            }
123        }
124
125        self.controllers_started = true;
126        Ok(())
127    }
128
129    pub(crate) fn stop_controllers(&mut self) -> anyhow::Result<()> {
130        if !self.controllers_started {
131            return Ok(());
132        }
133
134        let mut first_error = None;
135
136        for controller in self.controllers.iter_mut().rev() {
137            if let Err(e) = controller.on_stop().with_context(|| {
138                format!(
139                    "failed to stop plug-in controller '{}' from plug-in '{}'",
140                    controller.type_name(),
141                    controller.plugin_name()
142                )
143            }) {
144                log::error!("{e}");
145                if first_error.is_none() {
146                    first_error = Some(e);
147                }
148            }
149        }
150
151        self.controllers_started = false;
152
153        if let Some(e) = first_error {
154            Err(e)
155        } else {
156            Ok(())
157        }
158    }
159}
160
161#[derive(Debug)]
162pub(crate) enum NodePluginAdapter {
163    Actor(Box<PluginActorAdapter>),
164    Strategy(Box<PluginStrategyAdapter>),
165    Controller(PluginControllerAdapter),
166}
167
168#[derive(Debug)]
169pub(crate) struct NodePluginBatch {
170    loader: PluginLoader,
171    adapters: Vec<NodePluginAdapter>,
172}
173
174impl NodePluginBatch {
175    pub(crate) fn into_parts(self) -> (PluginLoader, Vec<NodePluginAdapter>) {
176        (self.loader, self.adapters)
177    }
178}
179
180pub(crate) fn load_configured_plugin_batch(
181    configs: &[PluginConfig],
182) -> anyhow::Result<NodePluginBatch> {
183    let mut loader = plugin_loader();
184    let mut loaded_paths = AHashSet::new();
185
186    for config in configs {
187        verify_plugin_sha256(config)?;
188        if loaded_paths.insert(config.path.clone()) {
189            loader
190                .load(&config.path)
191                .with_context(|| format!("failed to load plug-in '{}'", config.path))?;
192        }
193    }
194
195    for loaded in loader.loaded() {
196        let registered =
197            register_manifest_custom_data(loaded.validated_manifest()).with_context(|| {
198                format!(
199                    "failed to register custom data from plug-in '{}'",
200                    loaded.path().display()
201                )
202            })?;
203
204        if registered > 0 {
205            log::info!(
206                "Registered {registered} custom data type(s) from plug-in {}",
207                loaded.path().display()
208            );
209        }
210    }
211
212    let adapters = configs
213        .iter()
214        .map(|config| configured_plugin_adapter_from_loader(&loader, config))
215        .collect::<anyhow::Result<_>>()?;
216
217    Ok(NodePluginBatch { loader, adapters })
218}
219
220fn configured_plugin_adapter_from_loader(
221    loader: &PluginLoader,
222    config: &PluginConfig,
223) -> anyhow::Result<NodePluginAdapter> {
224    let loaded = loader
225        .loaded()
226        .iter()
227        .find(|loaded| loaded.path() == Path::new(&config.path))
228        .ok_or_else(|| anyhow::anyhow!("plug-in '{}' was not loaded", config.path))?;
229
230    let entry = configured_entry(loaded.validated_manifest(), &config.path, &config.type_name)?;
231    configured_plugin_adapter(entry, config)
232}
233
234fn configured_plugin_adapter(
235    entry: ConfiguredPluginEntry,
236    config: &PluginConfig,
237) -> anyhow::Result<NodePluginAdapter> {
238    let config_json = serde_json::to_string(&config.config)?;
239
240    match entry {
241        ConfiguredPluginEntry::Actor(entry) => {
242            let actor_id = plugin_actor_id(config)?;
243            let adapter = entry
244                .create_adapter(actor_id, &config_json)
245                .with_context(|| {
246                    format!(
247                        "failed to instantiate plug-in actor '{}' from {}",
248                        config.type_name, config.path
249                    )
250                })?;
251            Ok(NodePluginAdapter::Actor(Box::new(adapter)))
252        }
253        ConfiguredPluginEntry::Strategy(entry) => {
254            let strategy_config = plugin_strategy_config(config)?;
255            let adapter = entry
256                .create_adapter(strategy_config, &config_json)
257                .with_context(|| {
258                    format!(
259                        "failed to instantiate plug-in strategy '{}' from {}",
260                        config.type_name, config.path
261                    )
262                })?;
263            Ok(NodePluginAdapter::Strategy(Box::new(adapter)))
264        }
265        ConfiguredPluginEntry::Controller(entry) => {
266            let adapter = entry.create_adapter(&config_json).with_context(|| {
267                format!(
268                    "failed to instantiate plug-in controller '{}' from {}",
269                    config.type_name, config.path
270                )
271            })?;
272            Ok(NodePluginAdapter::Controller(adapter))
273        }
274    }
275}
276
277fn verify_plugin_sha256(config: &PluginConfig) -> anyhow::Result<()> {
278    let Some(expected) = &config.sha256 else {
279        return Ok(());
280    };
281
282    let bytes = std::fs::read(&config.path)
283        .with_context(|| format!("failed to read plug-in '{}'", config.path))?;
284    let actual = hex::encode(digest::digest(&digest::SHA256, &bytes).as_ref());
285    if actual.eq_ignore_ascii_case(expected) {
286        return Ok(());
287    }
288
289    anyhow::bail!(
290        "plug-in '{}' SHA-256 mismatch: expected {}, actual {}",
291        config.path,
292        expected,
293        actual
294    )
295}
296
297fn plugin_actor_id(config: &PluginConfig) -> anyhow::Result<ActorId> {
298    let actor_id = plugin_config_string(config, "actor_id")?.unwrap_or(&config.type_name);
299    ActorId::new_checked(actor_id)
300        .map_err(|e| anyhow::anyhow!("invalid actor_id for plug-in '{}': {e}", config.type_name))
301}
302
303fn plugin_strategy_config(config: &PluginConfig) -> anyhow::Result<StrategyConfig> {
304    let mut strategy_config = if let Some(value) = config.config.get("strategy_config") {
305        serde_json::from_value::<StrategyConfig>(value.clone()).with_context(|| {
306            format!(
307                "invalid strategy_config for plug-in strategy '{}'",
308                config.type_name
309            )
310        })?
311    } else {
312        StrategyConfig::default()
313    };
314
315    if strategy_config.strategy_id.is_none() {
316        let strategy_id = plugin_config_string(config, "strategy_id")?
317            .map_or_else(|| format!("{}-001", config.type_name), str::to_string);
318        strategy_config.strategy_id = Some(StrategyId::new_checked(&strategy_id).map_err(|e| {
319            anyhow::anyhow!(
320                "invalid strategy_id for plug-in strategy '{}': {e}",
321                config.type_name
322            )
323        })?);
324    }
325
326    if strategy_config.order_id_tag.is_none()
327        && let Some(order_id_tag) = plugin_config_string(config, "order_id_tag")?
328    {
329        strategy_config.order_id_tag = Some(order_id_tag.to_string());
330    }
331
332    Ok(strategy_config)
333}
334
335fn plugin_config_string<'a>(
336    config: &'a PluginConfig,
337    key: &'static str,
338) -> anyhow::Result<Option<&'a str>> {
339    match config.config.get(key) {
340        None | Some(serde_json::Value::Null) => Ok(None),
341        Some(serde_json::Value::String(value)) => Ok(Some(value.as_str())),
342        Some(_) => anyhow::bail!(
343            "plug-in '{}' config field '{key}' must be a string",
344            config.type_name
345        ),
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use std::{collections::HashMap, path::PathBuf};
352
353    use nautilus_core::UUID4;
354    use rstest::rstest;
355
356    use super::*;
357
358    #[rstest]
359    fn test_verify_plugin_sha256_accepts_matching_uppercase_digest() {
360        let bytes = b"plugin bytes";
361        let path = write_plugin_bytes(bytes);
362        let config = PluginConfig {
363            path: path.to_string_lossy().into_owned(),
364            type_name: "ExampleActor".to_string(),
365            config: HashMap::new(),
366            sha256: Some(sha256_hex(bytes).to_uppercase()),
367        };
368
369        let result = verify_plugin_sha256(&config);
370        std::fs::remove_file(path).unwrap();
371
372        assert!(result.is_ok());
373    }
374
375    #[rstest]
376    fn test_verify_plugin_sha256_rejects_mismatch() {
377        let path = write_plugin_bytes(b"plugin bytes");
378        let config = PluginConfig {
379            path: path.to_string_lossy().into_owned(),
380            type_name: "ExampleActor".to_string(),
381            config: HashMap::new(),
382            sha256: Some("0".repeat(64)),
383        };
384
385        let error = verify_plugin_sha256(&config).unwrap_err().to_string();
386        std::fs::remove_file(path).unwrap();
387
388        assert!(error.contains("SHA-256 mismatch"));
389    }
390
391    #[rstest]
392    fn test_verify_plugin_sha256_reports_missing_file() {
393        let path = plugin_test_path();
394        let config = PluginConfig {
395            path: path.to_string_lossy().into_owned(),
396            type_name: "ExampleActor".to_string(),
397            config: HashMap::new(),
398            sha256: Some("0".repeat(64)),
399        };
400
401        let error = verify_plugin_sha256(&config).unwrap_err().to_string();
402
403        assert!(error.contains("failed to read plug-in"));
404    }
405
406    #[rstest]
407    fn test_verify_plugin_sha256_skips_missing_digest() {
408        let path = plugin_test_path();
409        let config = PluginConfig {
410            path: path.to_string_lossy().into_owned(),
411            type_name: "ExampleActor".to_string(),
412            config: HashMap::new(),
413            sha256: None,
414        };
415
416        assert!(verify_plugin_sha256(&config).is_ok());
417    }
418
419    #[rstest]
420    fn test_plugin_actor_id_rejects_non_string_actor_id() {
421        let config = PluginConfig {
422            path: "./libexample.so".to_string(),
423            type_name: "ExampleActor".to_string(),
424            config: HashMap::from([("actor_id".to_string(), serde_json::json!(42))]),
425            sha256: None,
426        };
427
428        let error = plugin_actor_id(&config).unwrap_err().to_string();
429
430        assert!(error.contains("actor_id"));
431        assert!(error.contains("must be a string"));
432    }
433
434    #[rstest]
435    fn test_plugin_strategy_config_accepts_nested_strategy_config() {
436        let config = PluginConfig {
437            path: "./libexample.so".to_string(),
438            type_name: "ExampleStrategy".to_string(),
439            config: HashMap::from([(
440                "strategy_config".to_string(),
441                serde_json::json!({
442                    "strategy_id": "NestedStrategy-001",
443                    "order_id_tag": "NEST",
444                }),
445            )]),
446            sha256: None,
447        };
448
449        let strategy_config = plugin_strategy_config(&config).unwrap();
450
451        assert_eq!(
452            strategy_config.strategy_id,
453            Some(StrategyId::from("NestedStrategy-001"))
454        );
455        assert_eq!(strategy_config.order_id_tag.as_deref(), Some("NEST"));
456    }
457
458    #[rstest]
459    fn test_plugin_strategy_config_uses_default_strategy_id() {
460        let config = PluginConfig {
461            path: "./libexample.so".to_string(),
462            type_name: "ExampleStrategy".to_string(),
463            config: HashMap::new(),
464            sha256: None,
465        };
466
467        let strategy_config = plugin_strategy_config(&config).unwrap();
468
469        assert_eq!(
470            strategy_config.strategy_id,
471            Some(StrategyId::from("ExampleStrategy-001"))
472        );
473    }
474
475    #[rstest]
476    fn test_plugin_strategy_config_uses_top_level_strategy_id_and_order_id_tag() {
477        let config = PluginConfig {
478            path: "./libexample.so".to_string(),
479            type_name: "ExampleStrategy".to_string(),
480            config: HashMap::from([
481                (
482                    "strategy_id".to_string(),
483                    serde_json::json!("TopLevelStrategy-001"),
484                ),
485                ("order_id_tag".to_string(), serde_json::json!("TOP")),
486            ]),
487            sha256: None,
488        };
489
490        let strategy_config = plugin_strategy_config(&config).unwrap();
491
492        assert_eq!(
493            strategy_config.strategy_id,
494            Some(StrategyId::from("TopLevelStrategy-001"))
495        );
496        assert_eq!(strategy_config.order_id_tag.as_deref(), Some("TOP"));
497    }
498
499    fn write_plugin_bytes(bytes: &[u8]) -> PathBuf {
500        let path = plugin_test_path();
501        std::fs::write(&path, bytes).unwrap();
502        path
503    }
504
505    fn plugin_test_path() -> PathBuf {
506        std::env::temp_dir().join(format!("nautilus-live-plugin-{}.bin", UUID4::new()))
507    }
508
509    fn sha256_hex(bytes: &[u8]) -> String {
510        hex::encode(digest::digest(&digest::SHA256, bytes).as_ref())
511    }
512}