1use std::path::Path;
24
25use ahash::AHashSet;
26use anyhow::Context;
27use aws_lc_rs::digest;
28use nautilus_core::hex;
29use nautilus_model::identifiers::{ActorId, StrategyId};
30pub use nautilus_plugin::bridge::*;
31use nautilus_plugin::loader::PluginLoader;
32use nautilus_trading::strategy::StrategyConfig;
33
34use crate::config::PluginConfig;
35
36#[derive(Debug, Default)]
37pub(crate) struct NodePlugins {
38 loader: Option<PluginLoader>,
39 controllers: Vec<PluginControllerAdapter>,
40 controllers_started: bool,
41}
42
43impl NodePlugins {
44 pub(crate) fn set_loader(&mut self, loader: PluginLoader) {
45 self.loader = Some(loader);
46 }
47
48 pub(crate) fn push_controller(&mut self, controller: PluginControllerAdapter) {
49 self.controllers.push(controller);
50 }
51
52 pub(crate) fn load_plugin(
53 &mut self,
54 config: &PluginConfig,
55 ) -> anyhow::Result<NodePluginAdapter> {
56 verify_plugin_sha256(config)?;
57
58 let entry = {
59 let loader = self.loader.get_or_insert_with(plugin_loader);
60 let path = Path::new(&config.path);
61 let already_loaded = loader.loaded().iter().any(|loaded| loaded.path() == path);
62
63 if !already_loaded {
64 let loaded = loader
65 .load(&config.path)
66 .with_context(|| format!("failed to load plug-in '{}'", config.path))?;
67 let registered = register_manifest_custom_data(loaded.validated_manifest())
68 .with_context(|| {
69 format!(
70 "failed to register custom data from plug-in '{}'",
71 loaded.path().display()
72 )
73 })?;
74
75 if registered > 0 {
76 log::info!(
77 "Registered {registered} custom data type(s) from plug-in {}",
78 loaded.path().display()
79 );
80 }
81 }
82
83 let loaded = loader
84 .loaded()
85 .iter()
86 .find(|loaded| loaded.path() == path)
87 .ok_or_else(|| anyhow::anyhow!("plug-in '{}' was not loaded", config.path))?;
88 configured_entry(loaded.validated_manifest(), &config.path, &config.type_name)?
89 };
90
91 configured_plugin_adapter(entry, config)
92 }
93
94 pub(crate) fn start_controllers(&mut self) -> anyhow::Result<()> {
95 if self.controllers_started {
96 return Ok(());
97 }
98
99 for index in 0..self.controllers.len() {
100 let result = {
101 let controller = &mut self.controllers[index];
102 controller.on_start().with_context(|| {
103 format!(
104 "failed to start plug-in controller '{}' from plug-in '{}'",
105 controller.type_name(),
106 controller.plugin_name()
107 )
108 })
109 };
110
111 if let Err(start_err) = result {
112 for controller in self.controllers[..index].iter_mut().rev() {
113 if let Err(stop_err) = controller.on_stop() {
114 log::error!(
115 "Failed to roll back plug-in controller '{}' from plug-in '{}': {stop_err}",
116 controller.type_name(),
117 controller.plugin_name()
118 );
119 }
120 }
121 return Err(start_err);
122 }
123 }
124
125 self.controllers_started = true;
126 Ok(())
127 }
128
129 pub(crate) fn stop_controllers(&mut self) -> anyhow::Result<()> {
130 if !self.controllers_started {
131 return Ok(());
132 }
133
134 let mut first_error = None;
135
136 for controller in self.controllers.iter_mut().rev() {
137 if let Err(e) = controller.on_stop().with_context(|| {
138 format!(
139 "failed to stop plug-in controller '{}' from plug-in '{}'",
140 controller.type_name(),
141 controller.plugin_name()
142 )
143 }) {
144 log::error!("{e}");
145 if first_error.is_none() {
146 first_error = Some(e);
147 }
148 }
149 }
150
151 self.controllers_started = false;
152
153 if let Some(e) = first_error {
154 Err(e)
155 } else {
156 Ok(())
157 }
158 }
159}
160
161#[derive(Debug)]
162pub(crate) enum NodePluginAdapter {
163 Actor(Box<PluginActorAdapter>),
164 Strategy(Box<PluginStrategyAdapter>),
165 Controller(PluginControllerAdapter),
166}
167
168#[derive(Debug)]
169pub(crate) struct NodePluginBatch {
170 loader: PluginLoader,
171 adapters: Vec<NodePluginAdapter>,
172}
173
174impl NodePluginBatch {
175 pub(crate) fn into_parts(self) -> (PluginLoader, Vec<NodePluginAdapter>) {
176 (self.loader, self.adapters)
177 }
178}
179
180pub(crate) fn load_configured_plugin_batch(
181 configs: &[PluginConfig],
182) -> anyhow::Result<NodePluginBatch> {
183 let mut loader = plugin_loader();
184 let mut loaded_paths = AHashSet::new();
185
186 for config in configs {
187 verify_plugin_sha256(config)?;
188 if loaded_paths.insert(config.path.clone()) {
189 loader
190 .load(&config.path)
191 .with_context(|| format!("failed to load plug-in '{}'", config.path))?;
192 }
193 }
194
195 for loaded in loader.loaded() {
196 let registered =
197 register_manifest_custom_data(loaded.validated_manifest()).with_context(|| {
198 format!(
199 "failed to register custom data from plug-in '{}'",
200 loaded.path().display()
201 )
202 })?;
203
204 if registered > 0 {
205 log::info!(
206 "Registered {registered} custom data type(s) from plug-in {}",
207 loaded.path().display()
208 );
209 }
210 }
211
212 let adapters = configs
213 .iter()
214 .map(|config| configured_plugin_adapter_from_loader(&loader, config))
215 .collect::<anyhow::Result<_>>()?;
216
217 Ok(NodePluginBatch { loader, adapters })
218}
219
220fn configured_plugin_adapter_from_loader(
221 loader: &PluginLoader,
222 config: &PluginConfig,
223) -> anyhow::Result<NodePluginAdapter> {
224 let loaded = loader
225 .loaded()
226 .iter()
227 .find(|loaded| loaded.path() == Path::new(&config.path))
228 .ok_or_else(|| anyhow::anyhow!("plug-in '{}' was not loaded", config.path))?;
229
230 let entry = configured_entry(loaded.validated_manifest(), &config.path, &config.type_name)?;
231 configured_plugin_adapter(entry, config)
232}
233
234fn configured_plugin_adapter(
235 entry: ConfiguredPluginEntry,
236 config: &PluginConfig,
237) -> anyhow::Result<NodePluginAdapter> {
238 let config_json = serde_json::to_string(&config.config)?;
239
240 match entry {
241 ConfiguredPluginEntry::Actor(entry) => {
242 let actor_id = plugin_actor_id(config)?;
243 let adapter = entry
244 .create_adapter(actor_id, &config_json)
245 .with_context(|| {
246 format!(
247 "failed to instantiate plug-in actor '{}' from {}",
248 config.type_name, config.path
249 )
250 })?;
251 Ok(NodePluginAdapter::Actor(Box::new(adapter)))
252 }
253 ConfiguredPluginEntry::Strategy(entry) => {
254 let strategy_config = plugin_strategy_config(config)?;
255 let adapter = entry
256 .create_adapter(strategy_config, &config_json)
257 .with_context(|| {
258 format!(
259 "failed to instantiate plug-in strategy '{}' from {}",
260 config.type_name, config.path
261 )
262 })?;
263 Ok(NodePluginAdapter::Strategy(Box::new(adapter)))
264 }
265 ConfiguredPluginEntry::Controller(entry) => {
266 let adapter = entry.create_adapter(&config_json).with_context(|| {
267 format!(
268 "failed to instantiate plug-in controller '{}' from {}",
269 config.type_name, config.path
270 )
271 })?;
272 Ok(NodePluginAdapter::Controller(adapter))
273 }
274 }
275}
276
277fn verify_plugin_sha256(config: &PluginConfig) -> anyhow::Result<()> {
278 let Some(expected) = &config.sha256 else {
279 return Ok(());
280 };
281
282 let bytes = std::fs::read(&config.path)
283 .with_context(|| format!("failed to read plug-in '{}'", config.path))?;
284 let actual = hex::encode(digest::digest(&digest::SHA256, &bytes).as_ref());
285 if actual.eq_ignore_ascii_case(expected) {
286 return Ok(());
287 }
288
289 anyhow::bail!(
290 "plug-in '{}' SHA-256 mismatch: expected {}, actual {}",
291 config.path,
292 expected,
293 actual
294 )
295}
296
297fn plugin_actor_id(config: &PluginConfig) -> anyhow::Result<ActorId> {
298 let actor_id = plugin_config_string(config, "actor_id")?.unwrap_or(&config.type_name);
299 ActorId::new_checked(actor_id)
300 .map_err(|e| anyhow::anyhow!("invalid actor_id for plug-in '{}': {e}", config.type_name))
301}
302
303fn plugin_strategy_config(config: &PluginConfig) -> anyhow::Result<StrategyConfig> {
304 let mut strategy_config = if let Some(value) = config.config.get("strategy_config") {
305 serde_json::from_value::<StrategyConfig>(value.clone()).with_context(|| {
306 format!(
307 "invalid strategy_config for plug-in strategy '{}'",
308 config.type_name
309 )
310 })?
311 } else {
312 StrategyConfig::default()
313 };
314
315 if strategy_config.strategy_id.is_none() {
316 let strategy_id = plugin_config_string(config, "strategy_id")?
317 .map_or_else(|| format!("{}-001", config.type_name), str::to_string);
318 strategy_config.strategy_id = Some(StrategyId::new_checked(&strategy_id).map_err(|e| {
319 anyhow::anyhow!(
320 "invalid strategy_id for plug-in strategy '{}': {e}",
321 config.type_name
322 )
323 })?);
324 }
325
326 if strategy_config.order_id_tag.is_none()
327 && let Some(order_id_tag) = plugin_config_string(config, "order_id_tag")?
328 {
329 strategy_config.order_id_tag = Some(order_id_tag.to_string());
330 }
331
332 Ok(strategy_config)
333}
334
335fn plugin_config_string<'a>(
336 config: &'a PluginConfig,
337 key: &'static str,
338) -> anyhow::Result<Option<&'a str>> {
339 match config.config.get(key) {
340 None | Some(serde_json::Value::Null) => Ok(None),
341 Some(serde_json::Value::String(value)) => Ok(Some(value.as_str())),
342 Some(_) => anyhow::bail!(
343 "plug-in '{}' config field '{key}' must be a string",
344 config.type_name
345 ),
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use std::{collections::HashMap, path::PathBuf};
352
353 use nautilus_core::UUID4;
354 use rstest::rstest;
355
356 use super::*;
357
358 #[rstest]
359 fn test_verify_plugin_sha256_accepts_matching_uppercase_digest() {
360 let bytes = b"plugin bytes";
361 let path = write_plugin_bytes(bytes);
362 let config = PluginConfig {
363 path: path.to_string_lossy().into_owned(),
364 type_name: "ExampleActor".to_string(),
365 config: HashMap::new(),
366 sha256: Some(sha256_hex(bytes).to_uppercase()),
367 };
368
369 let result = verify_plugin_sha256(&config);
370 std::fs::remove_file(path).unwrap();
371
372 assert!(result.is_ok());
373 }
374
375 #[rstest]
376 fn test_verify_plugin_sha256_rejects_mismatch() {
377 let path = write_plugin_bytes(b"plugin bytes");
378 let config = PluginConfig {
379 path: path.to_string_lossy().into_owned(),
380 type_name: "ExampleActor".to_string(),
381 config: HashMap::new(),
382 sha256: Some("0".repeat(64)),
383 };
384
385 let error = verify_plugin_sha256(&config).unwrap_err().to_string();
386 std::fs::remove_file(path).unwrap();
387
388 assert!(error.contains("SHA-256 mismatch"));
389 }
390
391 #[rstest]
392 fn test_verify_plugin_sha256_reports_missing_file() {
393 let path = plugin_test_path();
394 let config = PluginConfig {
395 path: path.to_string_lossy().into_owned(),
396 type_name: "ExampleActor".to_string(),
397 config: HashMap::new(),
398 sha256: Some("0".repeat(64)),
399 };
400
401 let error = verify_plugin_sha256(&config).unwrap_err().to_string();
402
403 assert!(error.contains("failed to read plug-in"));
404 }
405
406 #[rstest]
407 fn test_verify_plugin_sha256_skips_missing_digest() {
408 let path = plugin_test_path();
409 let config = PluginConfig {
410 path: path.to_string_lossy().into_owned(),
411 type_name: "ExampleActor".to_string(),
412 config: HashMap::new(),
413 sha256: None,
414 };
415
416 assert!(verify_plugin_sha256(&config).is_ok());
417 }
418
419 #[rstest]
420 fn test_plugin_actor_id_rejects_non_string_actor_id() {
421 let config = PluginConfig {
422 path: "./libexample.so".to_string(),
423 type_name: "ExampleActor".to_string(),
424 config: HashMap::from([("actor_id".to_string(), serde_json::json!(42))]),
425 sha256: None,
426 };
427
428 let error = plugin_actor_id(&config).unwrap_err().to_string();
429
430 assert!(error.contains("actor_id"));
431 assert!(error.contains("must be a string"));
432 }
433
434 #[rstest]
435 fn test_plugin_strategy_config_accepts_nested_strategy_config() {
436 let config = PluginConfig {
437 path: "./libexample.so".to_string(),
438 type_name: "ExampleStrategy".to_string(),
439 config: HashMap::from([(
440 "strategy_config".to_string(),
441 serde_json::json!({
442 "strategy_id": "NestedStrategy-001",
443 "order_id_tag": "NEST",
444 }),
445 )]),
446 sha256: None,
447 };
448
449 let strategy_config = plugin_strategy_config(&config).unwrap();
450
451 assert_eq!(
452 strategy_config.strategy_id,
453 Some(StrategyId::from("NestedStrategy-001"))
454 );
455 assert_eq!(strategy_config.order_id_tag.as_deref(), Some("NEST"));
456 }
457
458 #[rstest]
459 fn test_plugin_strategy_config_uses_default_strategy_id() {
460 let config = PluginConfig {
461 path: "./libexample.so".to_string(),
462 type_name: "ExampleStrategy".to_string(),
463 config: HashMap::new(),
464 sha256: None,
465 };
466
467 let strategy_config = plugin_strategy_config(&config).unwrap();
468
469 assert_eq!(
470 strategy_config.strategy_id,
471 Some(StrategyId::from("ExampleStrategy-001"))
472 );
473 }
474
475 #[rstest]
476 fn test_plugin_strategy_config_uses_top_level_strategy_id_and_order_id_tag() {
477 let config = PluginConfig {
478 path: "./libexample.so".to_string(),
479 type_name: "ExampleStrategy".to_string(),
480 config: HashMap::from([
481 (
482 "strategy_id".to_string(),
483 serde_json::json!("TopLevelStrategy-001"),
484 ),
485 ("order_id_tag".to_string(), serde_json::json!("TOP")),
486 ]),
487 sha256: None,
488 };
489
490 let strategy_config = plugin_strategy_config(&config).unwrap();
491
492 assert_eq!(
493 strategy_config.strategy_id,
494 Some(StrategyId::from("TopLevelStrategy-001"))
495 );
496 assert_eq!(strategy_config.order_id_tag.as_deref(), Some("TOP"));
497 }
498
499 fn write_plugin_bytes(bytes: &[u8]) -> PathBuf {
500 let path = plugin_test_path();
501 std::fs::write(&path, bytes).unwrap();
502 path
503 }
504
505 fn plugin_test_path() -> PathBuf {
506 std::env::temp_dir().join(format!("nautilus-live-plugin-{}.bin", UUID4::new()))
507 }
508
509 fn sha256_hex(bytes: &[u8]) -> String {
510 hex::encode(digest::digest(&digest::SHA256, bytes).as_ref())
511 }
512}