1use async_trait::async_trait;
11use glob::Pattern;
12use serde_json::json;
13use std::path::PathBuf;
14use std::sync::Arc;
15use tokio::sync::Mutex;
16
17use crate::config::Config;
18use crate::context::Context;
19use crate::errors::{ErrorCode, ModuleError};
20use crate::events::emitter::EventEmitter;
21use crate::module::Module;
22use crate::observability::redaction::DEFAULT_REPLACEMENT;
23use crate::registry::dependencies::resolve_dependencies;
24use crate::registry::registry::Registry;
25use crate::registry::types::DepInfo;
26
27use super::audit::{build_audit_entry, record_audit, AuditAction, AuditChange, AuditStore};
28use super::overrides::{persist_one, write_override, OverridesStore};
29use super::{
30 augment_with_context_identity, emit_event, is_sensitive_key, missing_field_error,
31 require_string, ToggleState, RESTRICTED_KEYS,
32};
33
34pub struct UpdateConfigModule {
40 config: Arc<Mutex<Config>>,
41 emitter: Arc<Mutex<EventEmitter>>,
42 overrides_path: Option<PathBuf>,
43 overrides_store: Option<Arc<dyn OverridesStore>>,
44 audit_store: Option<Arc<dyn AuditStore>>,
45}
46
47impl UpdateConfigModule {
48 pub fn new(config: Arc<Mutex<Config>>, emitter: Arc<Mutex<EventEmitter>>) -> Self {
49 Self {
50 config,
51 emitter,
52 overrides_path: None,
53 overrides_store: None,
54 audit_store: None,
55 }
56 }
57
58 #[must_use]
59 pub fn with_overrides_path(mut self, overrides_path: Option<PathBuf>) -> Self {
60 self.overrides_path = overrides_path;
61 self
62 }
63
64 #[must_use]
71 pub fn with_overrides_store(
72 mut self,
73 overrides_store: Option<Arc<dyn OverridesStore>>,
74 ) -> Self {
75 self.overrides_store = overrides_store;
76 self
77 }
78
79 #[must_use]
80 pub fn with_audit_store(mut self, audit_store: Option<Arc<dyn AuditStore>>) -> Self {
81 self.audit_store = audit_store;
82 self
83 }
84}
85
86#[async_trait]
87impl Module for UpdateConfigModule {
88 fn description(&self) -> &'static str {
89 "Update a runtime configuration value by dot-path key"
90 }
91
92 fn input_schema(&self) -> serde_json::Value {
93 json!({
94 "type": "object",
95 "required": ["key", "value", "reason"],
96 "properties": {
97 "key": {"type": "string"},
98 "value": {},
99 "reason": {"type": "string"}
100 }
101 })
102 }
103
104 fn output_schema(&self) -> serde_json::Value {
105 json!({
106 "type": "object",
107 "properties": {
108 "success": {"type": "boolean"},
109 "key": {"type": "string"},
110 "old_value": {},
111 "new_value": {}
112 }
113 })
114 }
115
116 async fn execute(
117 &self,
118 inputs: serde_json::Value,
119 ctx: &Context<serde_json::Value>,
120 ) -> Result<serde_json::Value, ModuleError> {
121 let key = require_string(&inputs, "key")?;
122 let reason = require_string(&inputs, "reason")?;
123 let value = inputs
124 .get("value")
125 .cloned()
126 .ok_or_else(|| missing_field_error("value"))?;
127
128 if RESTRICTED_KEYS.contains(&key.as_str()) {
129 return Err(ModuleError::new(
133 ErrorCode::ConfigKeyRestricted,
134 format!("Configuration key '{key}' cannot be changed at runtime"),
135 )
136 .with_details([("key".to_string(), json!(key))].into_iter().collect()));
137 }
138
139 let old_value = {
140 let cfg = self.config.lock().await;
141 cfg.get(&key)
142 };
143
144 {
145 let mut cfg = self.config.lock().await;
146 cfg.set(&key, value.clone());
147 }
148
149 if let Some(store) = self.overrides_store.as_ref() {
154 if let Err(e) = persist_one(store.as_ref(), &key, &value).await {
155 tracing::warn!(error = %e, key = %key, "OverridesStore persist failed");
156 }
157 } else if let Some(path) = self.overrides_path.as_deref() {
158 write_override(path, &key, &value);
159 }
160
161 let sensitive = is_sensitive_key(&key);
167 let redacted_old: serde_json::Value = if sensitive {
168 json!(DEFAULT_REPLACEMENT)
169 } else {
170 old_value.clone().unwrap_or(serde_json::Value::Null)
171 };
172 let redacted_new: serde_json::Value = if sensitive {
173 json!(DEFAULT_REPLACEMENT)
174 } else {
175 value.clone()
176 };
177
178 let timestamp = chrono::Utc::now().to_rfc3339();
179 let event_data = augment_with_context_identity(
182 json!({
183 "key": key,
184 "old_value": redacted_old,
185 "new_value": redacted_new,
186 "reason": reason,
187 }),
188 ctx,
189 );
190
191 emit_event(
192 &self.emitter,
193 "apcore.config.updated",
194 "system.control.update_config",
195 ×tamp,
196 event_data,
197 )
198 .await;
199
200 if sensitive {
201 tracing::info!(key = %key, reason = %reason, "Config updated: old_value=*** new_value=***");
202 } else {
203 tracing::info!(
204 key = %key,
205 old_value = ?old_value,
206 new_value = ?value,
207 reason = %reason,
208 "Config updated"
209 );
210 }
211
212 let entry = build_audit_entry(
213 AuditAction::UpdateConfig,
214 "system.control.update_config",
215 ctx,
216 AuditChange {
217 before: redacted_old.clone(),
218 after: redacted_new.clone(),
219 },
220 );
221 record_audit(self.audit_store.as_ref(), entry).await;
222
223 Ok(json!({
224 "success": true,
225 "key": key,
226 "old_value": redacted_old,
227 "new_value": redacted_new,
228 }))
229 }
230}
231
232pub struct ReloadModule {
253 registry: Arc<Registry>,
254 emitter: Arc<Mutex<EventEmitter>>,
255 audit_store: Option<Arc<dyn AuditStore>>,
256 config: Option<Arc<Mutex<Config>>>,
257}
258
259impl ReloadModule {
260 pub fn new(registry: Arc<Registry>, emitter: Arc<Mutex<EventEmitter>>) -> Self {
261 Self {
262 registry,
263 emitter,
264 audit_store: None,
265 config: None,
266 }
267 }
268
269 #[must_use]
270 pub fn with_audit_store(mut self, audit_store: Option<Arc<dyn AuditStore>>) -> Self {
271 self.audit_store = audit_store;
272 self
273 }
274
275 #[must_use]
279 pub fn with_config(mut self, config: Option<Arc<Mutex<Config>>>) -> Self {
280 self.config = config;
281 self
282 }
283
284 fn topo_sort_modules(&self, matched: &[String]) -> Vec<String> {
289 let matched_set: std::collections::HashSet<String> = matched.iter().cloned().collect();
290 let entries: Vec<(String, Vec<DepInfo>)> = matched
291 .iter()
292 .map(|mid| {
293 let deps: Vec<DepInfo> = self
294 .registry
295 .get_definition(mid)
296 .ok()
297 .flatten()
298 .map(|d| {
299 d.dependencies
300 .into_iter()
301 .filter(|dep| matched_set.contains(&dep.module_id))
302 .map(|dep| DepInfo {
303 module_id: dep.module_id,
304 version: if dep.version_constraint.is_empty() {
305 None
306 } else {
307 Some(dep.version_constraint)
308 },
309 optional: dep.optional,
310 })
311 .collect()
312 })
313 .unwrap_or_default();
314 (mid.clone(), deps)
315 })
316 .collect();
317
318 match resolve_dependencies(&entries, Some(&matched_set), None) {
319 Ok(order) => order,
320 Err(e) => {
321 tracing::warn!(
322 error = %e,
323 "Topological sort failed for path_filter reload; falling back to alphabetical"
324 );
325 let mut sorted = matched.to_vec();
326 sorted.sort();
327 sorted
328 }
329 }
330 }
331
332 #[allow(
333 clippy::too_many_lines,
334 clippy::single_match_else,
335 clippy::map_unwrap_or
336 )]
337 async fn execute_single(
338 &self,
339 module_id: String,
340 reason: &str,
341 ctx: &Context<serde_json::Value>,
342 ) -> Result<serde_json::Value, ModuleError> {
343 let start = std::time::Instant::now();
355
356 if !self.registry.has(&module_id) {
357 return Err(ModuleError::new(
358 ErrorCode::ModuleNotFound,
359 format!("Module '{module_id}' not found"),
360 ));
361 }
362
363 let previous_version = self
365 .registry
366 .get_definition(&module_id)
367 .ok()
368 .flatten()
369 .map(|d| d.version)
370 .unwrap_or_else(|| "unknown".to_string());
371
372 let suspended_state = match self.registry.get(&module_id) {
376 Ok(Some(module)) => {
377 let module_for_panic = Arc::clone(&module);
378 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
379 module_for_panic.on_suspend()
380 })) {
381 Ok(state) => state,
382 Err(_) => {
383 tracing::warn!(
384 module_id = %module_id,
385 "Module on_suspend panicked; continuing reload"
386 );
387 None
388 }
389 }
390 }
391 _ => None,
392 };
393
394 self.registry.safe_unregister(&module_id, 5000).await?;
396
397 match self.registry.discover_internal().await {
402 Ok(count) => tracing::debug!(
403 module_id = %module_id,
404 count,
405 "Reload: discover_internal repopulated registry"
406 ),
407 Err(e) => tracing::warn!(
408 module_id = %module_id,
409 error = %e.message,
410 "Reload: discover_internal returned error (best-effort, continuing)"
411 ),
412 }
413
414 let new_version = self
420 .registry
421 .get_definition(&module_id)
422 .ok()
423 .flatten()
424 .map(|d| d.version)
425 .unwrap_or_else(|| previous_version.clone());
426
427 if let Some(state) = suspended_state {
428 if let Ok(Some(module)) = self.registry.get(&module_id) {
429 let module_for_panic = Arc::clone(&module);
430 if std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
431 module_for_panic.on_resume(state);
432 }))
433 .is_err()
434 {
435 tracing::warn!(
436 module_id = %module_id,
437 "Module on_resume panicked; reload still considered successful"
438 );
439 }
440 }
441 }
442
443 let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
445 let timestamp = chrono::Utc::now().to_rfc3339();
446 let event_data = augment_with_context_identity(
451 json!({
452 "module_id": module_id,
453 "previous_version": previous_version,
454 "new_version": new_version,
455 "reason": reason,
456 }),
457 ctx,
458 );
459 emit_event(
460 &self.emitter,
461 "apcore.module.reloaded",
462 &module_id,
463 ×tamp,
464 event_data,
465 )
466 .await;
467
468 tracing::info!(
470 module_id = %module_id,
471 previous_version = %previous_version,
472 new_version = %new_version,
473 reason = %reason,
474 "Module reloaded"
475 );
476
477 let entry = build_audit_entry(
478 AuditAction::ReloadModule,
479 &module_id,
480 ctx,
481 AuditChange {
482 before: json!(previous_version),
483 after: json!(new_version),
484 },
485 );
486 record_audit(self.audit_store.as_ref(), entry).await;
487
488 Ok(json!({
489 "success": true,
490 "module_id": module_id,
491 "previous_version": previous_version,
492 "new_version": new_version,
493 "reload_duration_ms": elapsed_ms,
494 }))
495 }
496
497 async fn execute_bulk(
498 &self,
499 path_filter: String,
500 reason: &str,
501 ctx: &Context<serde_json::Value>,
502 ) -> Result<serde_json::Value, ModuleError> {
503 let pattern = Pattern::new(&path_filter).map_err(|e| {
504 ModuleError::new(
505 ErrorCode::GeneralInvalidInput,
506 format!("'path_filter' is not a valid glob pattern: {e}"),
507 )
508 })?;
509
510 let mut matched: Vec<String> = self
511 .registry
512 .module_ids()
513 .into_iter()
514 .filter(|id| pattern.matches(id))
515 .collect();
516 matched.sort();
517
518 let order = self.topo_sort_modules(&matched);
519 let start = std::time::Instant::now();
520
521 let mut reloaded: Vec<String> = Vec::new();
522 for mid in order {
523 if !self.registry.has(&mid) {
524 continue;
525 }
526 match self.registry.safe_unregister(&mid, 5000).await {
527 Ok(_) => {
528 let timestamp = chrono::Utc::now().to_rfc3339();
529 let event_data = augment_with_context_identity(
530 json!({
531 "previous_version": "unknown",
532 "new_version": "unknown",
533 "reason": reason,
534 }),
535 ctx,
536 );
537 emit_event(
538 &self.emitter,
539 "apcore.module.reloaded",
540 &mid,
541 ×tamp,
542 event_data,
543 )
544 .await;
545 reloaded.push(mid);
546 }
547 Err(e) => {
548 tracing::error!(error = %e, module_id = %mid, "Bulk reload: failed to unregister");
549 }
550 }
551 }
552
553 let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
554 tracing::info!(
555 count = reloaded.len(),
556 path_filter = %path_filter,
557 reason = %reason,
558 "Bulk module reload"
559 );
560
561 let entry = build_audit_entry(
562 AuditAction::ReloadModule,
563 &path_filter,
564 ctx,
565 AuditChange {
566 before: serde_json::Value::Null,
567 after: json!(reloaded.clone()),
568 },
569 );
570 record_audit(self.audit_store.as_ref(), entry).await;
571
572 Ok(json!({
573 "success": true,
574 "module_id": serde_json::Value::Null,
575 "reloaded_modules": reloaded,
576 "reload_duration_ms": elapsed_ms,
577 }))
578 }
579}
580
581#[async_trait]
582impl Module for ReloadModule {
583 fn description(&self) -> &'static str {
584 "Hot-reload a module by safe unregister (re-registration must be done explicitly in Rust)"
585 }
586
587 fn input_schema(&self) -> serde_json::Value {
588 json!({
589 "type": "object",
590 "required": ["reason"],
591 "properties": {
592 "module_id": {"type": "string"},
593 "path_filter": {"type": "string"},
594 "reload_dependents": {"type": "boolean", "default": false},
595 "reload_config": {"type": "boolean", "default": false},
596 "reason": {"type": "string"}
597 }
598 })
599 }
600
601 fn output_schema(&self) -> serde_json::Value {
602 json!({
603 "type": "object",
604 "properties": {
605 "success": {"type": "boolean"},
606 "module_id": {"type": ["string", "null"]},
607 "previous_version": {"type": "string"},
608 "new_version": {"type": "string"},
609 "reload_duration_ms": {"type": "number"},
610 "reloaded_modules": {"type": "array", "items": {"type": "string"}}
611 }
612 })
613 }
614
615 async fn execute(
616 &self,
617 inputs: serde_json::Value,
618 ctx: &Context<serde_json::Value>,
619 ) -> Result<serde_json::Value, ModuleError> {
620 let reason = require_string(&inputs, "reason")?;
621
622 let module_id_input = inputs
623 .get("module_id")
624 .filter(|v| !v.is_null())
625 .and_then(|v| v.as_str())
626 .filter(|s| !s.is_empty());
627 let path_filter_input = inputs
628 .get("path_filter")
629 .filter(|v| !v.is_null())
630 .and_then(|v| v.as_str())
631 .filter(|s| !s.is_empty());
632
633 if module_id_input.is_some() && path_filter_input.is_some() {
634 return Err(ModuleError::new(
635 ErrorCode::ModuleReloadConflict,
636 "'module_id' and 'path_filter' are mutually exclusive",
637 ));
638 }
639
640 let reload_config_flag = inputs
641 .get("reload_config")
642 .and_then(serde_json::Value::as_bool)
643 .unwrap_or(false);
644
645 let mut config_reloaded = false;
649 if reload_config_flag {
650 if let Some(cfg_handle) = self.config.as_ref() {
651 let mut cfg = cfg_handle.lock().await;
652 match cfg.reload_from_disk() {
653 Ok(()) => {
654 config_reloaded = true;
655 let timestamp = chrono::Utc::now().to_rfc3339();
656 emit_event(
657 &self.emitter,
658 "apcore.config.reloaded",
659 "system.control.reload_module",
660 ×tamp,
661 json!({"reason": reason}),
662 )
663 .await;
664 tracing::info!(reason = %reason, "Config reloaded from disk");
665 }
666 Err(e) => {
667 tracing::warn!(
668 error = %e.message,
669 "reload_config: Config::reload_from_disk failed (continuing)"
670 );
671 }
672 }
673 } else {
674 tracing::warn!(
675 "reload_config: requested but no Config bound to ReloadModule \
676 (use ReloadModule::with_config to enable)"
677 );
678 }
679 }
680
681 if let Some(filter) = path_filter_input {
682 let mut result = self.execute_bulk(filter.to_string(), &reason, ctx).await?;
683 if let Some(obj) = result.as_object_mut() {
684 obj.insert("config_reloaded".to_string(), json!(config_reloaded));
685 }
686 return Ok(result);
687 }
688
689 if module_id_input.is_none() && reload_config_flag {
693 return Ok(json!({
694 "success": true,
695 "module_id": serde_json::Value::Null,
696 "config_reloaded": config_reloaded,
697 }));
698 }
699
700 let module_id = module_id_input.ok_or_else(|| {
701 ModuleError::new(
702 ErrorCode::GeneralInvalidInput,
703 "'module_id', 'path_filter', or 'reload_config' is required",
704 )
705 })?;
706
707 let mut result = self
708 .execute_single(module_id.to_string(), &reason, ctx)
709 .await?;
710 if let Some(obj) = result.as_object_mut() {
711 obj.insert("config_reloaded".to_string(), json!(config_reloaded));
712 }
713 Ok(result)
714 }
715}
716
717pub struct ToggleFeatureModule {
723 registry: Arc<Registry>,
724 emitter: Arc<Mutex<EventEmitter>>,
725 toggle_state: Arc<ToggleState>,
726 overrides_path: Option<PathBuf>,
727 overrides_store: Option<Arc<dyn OverridesStore>>,
728 audit_store: Option<Arc<dyn AuditStore>>,
729}
730
731impl ToggleFeatureModule {
732 pub fn new(
733 registry: Arc<Registry>,
734 emitter: Arc<Mutex<EventEmitter>>,
735 toggle_state: Arc<ToggleState>,
736 ) -> Self {
737 Self {
738 registry,
739 emitter,
740 toggle_state,
741 overrides_path: None,
742 overrides_store: None,
743 audit_store: None,
744 }
745 }
746
747 #[must_use]
748 pub fn with_overrides_path(mut self, overrides_path: Option<PathBuf>) -> Self {
749 self.overrides_path = overrides_path;
750 self
751 }
752
753 #[must_use]
756 pub fn with_overrides_store(
757 mut self,
758 overrides_store: Option<Arc<dyn OverridesStore>>,
759 ) -> Self {
760 self.overrides_store = overrides_store;
761 self
762 }
763
764 #[must_use]
765 pub fn with_audit_store(mut self, audit_store: Option<Arc<dyn AuditStore>>) -> Self {
766 self.audit_store = audit_store;
767 self
768 }
769}
770
771#[async_trait]
772impl Module for ToggleFeatureModule {
773 fn description(&self) -> &'static str {
774 "Disable or enable a module without unloading it"
775 }
776
777 fn input_schema(&self) -> serde_json::Value {
778 json!({
779 "type": "object",
780 "required": ["module_id", "enabled", "reason"],
781 "properties": {
782 "module_id": {"type": "string"},
783 "enabled": {"type": "boolean"},
784 "reason": {"type": "string"}
785 }
786 })
787 }
788
789 fn output_schema(&self) -> serde_json::Value {
790 json!({
791 "type": "object",
792 "properties": {
793 "success": {"type": "boolean"},
794 "module_id": {"type": "string"},
795 "enabled": {"type": "boolean"}
796 }
797 })
798 }
799
800 async fn execute(
801 &self,
802 inputs: serde_json::Value,
803 ctx: &Context<serde_json::Value>,
804 ) -> Result<serde_json::Value, ModuleError> {
805 let module_id = require_string(&inputs, "module_id")?;
806 let reason = require_string(&inputs, "reason")?;
807 let enabled = inputs
808 .get("enabled")
809 .and_then(serde_json::Value::as_bool)
810 .ok_or_else(|| {
811 ModuleError::new(
812 ErrorCode::GeneralInvalidInput,
813 "'enabled' is required and must be a boolean",
814 )
815 })?;
816
817 if !self.registry.has(&module_id) {
818 return Err(ModuleError::new(
819 ErrorCode::ModuleNotFound,
820 format!("Module '{module_id}' not found"),
821 ));
822 }
823
824 let before_enabled = !self.toggle_state.is_disabled(&module_id);
825
826 if enabled {
831 self.registry.enable(&module_id)?;
832 self.toggle_state.enable(&module_id);
833 } else {
834 self.registry.disable(&module_id)?;
835 self.toggle_state.disable(&module_id);
836 }
837
838 let toggle_key = format!("toggle.{module_id}");
839 let toggle_value = serde_json::Value::Bool(enabled);
840 if let Some(store) = self.overrides_store.as_ref() {
841 if let Err(e) = persist_one(store.as_ref(), &toggle_key, &toggle_value).await {
842 tracing::warn!(error = %e, key = %toggle_key, "OverridesStore persist failed");
843 }
844 } else if let Some(path) = self.overrides_path.as_deref() {
845 write_override(path, &toggle_key, &toggle_value);
846 }
847
848 let timestamp = chrono::Utc::now().to_rfc3339();
849 let event_data = augment_with_context_identity(
854 json!({
855 "module_id": module_id,
856 "enabled": enabled,
857 "reason": reason,
858 }),
859 ctx,
860 );
861 emit_event(
862 &self.emitter,
863 "apcore.module.toggled",
864 &module_id,
865 ×tamp,
866 event_data,
867 )
868 .await;
869
870 tracing::info!(
871 module_id = %module_id,
872 enabled = %enabled,
873 reason = %reason,
874 "Module toggled"
875 );
876
877 let entry = build_audit_entry(
878 AuditAction::ToggleFeature,
879 &module_id,
880 ctx,
881 AuditChange {
882 before: json!(before_enabled),
883 after: json!(enabled),
884 },
885 );
886 record_audit(self.audit_store.as_ref(), entry).await;
887
888 Ok(json!({
889 "success": true,
890 "module_id": module_id,
891 "enabled": enabled,
892 }))
893 }
894}