Skip to main content

apcore/sys_modules/
control.rs

1// APCore Protocol — System control modules
2// Spec reference: system.control.update_config (F11), system.control.reload_module (F10),
3//                 system.control.toggle_feature (F19)
4// Hardening (Issue #45 / system-modules.md §1.1–§1.4):
5//   §1.1 — overrides_path persistence for update_config + toggle_feature
6//   §1.2 — contextual AuditEntry recorded for every state-changing call
7//   §1.4 — path_filter glob (mutually exclusive with module_id) and
8//          dependency-topological reload order
9
10use async_trait::async_trait;
11use glob::Pattern;
12use serde_json::json;
13use std::path::PathBuf;
14use std::sync::Arc;
15use tokio::sync::Mutex;
16
17use crate::config::Config;
18use crate::context::Context;
19use crate::errors::{ErrorCode, ModuleError};
20use crate::events::emitter::EventEmitter;
21use crate::module::Module;
22use crate::observability::redaction::DEFAULT_REPLACEMENT;
23use crate::registry::dependencies::resolve_dependencies;
24use crate::registry::registry::Registry;
25use crate::registry::types::DepInfo;
26
27use super::audit::{build_audit_entry, record_audit, AuditAction, AuditChange, AuditStore};
28use super::overrides::{persist_one, write_override, OverridesStore};
29use super::{
30    augment_with_context_identity, emit_event, is_sensitive_key, missing_field_error,
31    require_string, ToggleState, RESTRICTED_KEYS,
32};
33
34// ---------------------------------------------------------------------------
35// UpdateConfigModule (F11) — runtime config mutation with optional persistence
36// ---------------------------------------------------------------------------
37
38/// Update a runtime configuration value by dot-path key (F11).
39pub struct UpdateConfigModule {
40    config: Arc<Mutex<Config>>,
41    emitter: Arc<Mutex<EventEmitter>>,
42    overrides_path: Option<PathBuf>,
43    overrides_store: Option<Arc<dyn OverridesStore>>,
44    audit_store: Option<Arc<dyn AuditStore>>,
45}
46
47impl UpdateConfigModule {
48    pub fn new(config: Arc<Mutex<Config>>, emitter: Arc<Mutex<EventEmitter>>) -> Self {
49        Self {
50            config,
51            emitter,
52            overrides_path: None,
53            overrides_store: None,
54            audit_store: None,
55        }
56    }
57
58    #[must_use]
59    pub fn with_overrides_path(mut self, overrides_path: Option<PathBuf>) -> Self {
60        self.overrides_path = overrides_path;
61        self
62    }
63
64    /// Bind a pluggable [`OverridesStore`] for persistence.
65    ///
66    /// When set, takes precedence over `overrides_path`. The store is used to
67    /// perform a read-modify-write of the supplied `key` after the in-memory
68    /// `Config` has been mutated. Cross-language: matches the `OverridesStore`
69    /// parameter accepted by `apcore-python` and `apcore-typescript`.
70    #[must_use]
71    pub fn with_overrides_store(
72        mut self,
73        overrides_store: Option<Arc<dyn OverridesStore>>,
74    ) -> Self {
75        self.overrides_store = overrides_store;
76        self
77    }
78
79    #[must_use]
80    pub fn with_audit_store(mut self, audit_store: Option<Arc<dyn AuditStore>>) -> Self {
81        self.audit_store = audit_store;
82        self
83    }
84}
85
86#[async_trait]
87impl Module for UpdateConfigModule {
88    fn description(&self) -> &'static str {
89        "Update a runtime configuration value by dot-path key"
90    }
91
92    fn input_schema(&self) -> serde_json::Value {
93        json!({
94            "type": "object",
95            "required": ["key", "value", "reason"],
96            "properties": {
97                "key":    {"type": "string"},
98                "value":  {},
99                "reason": {"type": "string"}
100            }
101        })
102    }
103
104    fn output_schema(&self) -> serde_json::Value {
105        json!({
106            "type": "object",
107            "properties": {
108                "success":   {"type": "boolean"},
109                "key":       {"type": "string"},
110                "old_value": {},
111                "new_value": {}
112            }
113        })
114    }
115
116    async fn execute(
117        &self,
118        inputs: serde_json::Value,
119        ctx: &Context<serde_json::Value>,
120    ) -> Result<serde_json::Value, ModuleError> {
121        let key = require_string(&inputs, "key")?;
122        let reason = require_string(&inputs, "reason")?;
123        let value = inputs
124            .get("value")
125            .cloned()
126            .ok_or_else(|| missing_field_error("value"))?;
127
128        if RESTRICTED_KEYS.contains(&key.as_str()) {
129            // D-25: emit a distinct CONFIG_KEY_RESTRICTED code so callers can
130            // match the policy-deny case independently of value-shape errors
131            // (Python/TS already do this).
132            return Err(ModuleError::new(
133                ErrorCode::ConfigKeyRestricted,
134                format!("Configuration key '{key}' cannot be changed at runtime"),
135            )
136            .with_details([("key".to_string(), json!(key))].into_iter().collect()));
137        }
138
139        let old_value = {
140            let cfg = self.config.lock().await;
141            cfg.get(&key)
142        };
143
144        {
145            let mut cfg = self.config.lock().await;
146            cfg.set(&key, value.clone());
147        }
148
149        // Persist *after* the in-memory mutation succeeded so a write failure
150        // cannot poison the runtime state. Errors are logged and not
151        // propagated — overrides persistence is best-effort.
152        // Pluggable store takes precedence over the legacy file path.
153        if let Some(store) = self.overrides_store.as_ref() {
154            if let Err(e) = persist_one(store.as_ref(), &key, &value).await {
155                tracing::warn!(error = %e, key = %key, "OverridesStore persist failed");
156            }
157        } else if let Some(path) = self.overrides_path.as_deref() {
158            write_override(path, &key, &value);
159        }
160
161        // §1.2 + spec §F11 lines 337-339: redact `old_value`/`new_value` in the
162        // emitted event, the audit entry, and the response payload when `key`
163        // matches a sensitive segment. The in-memory `Config` still holds the
164        // real value — the sentinel only blocks egress to logs / events / audit
165        // store / RPC response.
166        let sensitive = is_sensitive_key(&key);
167        let redacted_old: serde_json::Value = if sensitive {
168            json!(DEFAULT_REPLACEMENT)
169        } else {
170            old_value.clone().unwrap_or(serde_json::Value::Null)
171        };
172        let redacted_new: serde_json::Value = if sensitive {
173            json!(DEFAULT_REPLACEMENT)
174        } else {
175            value.clone()
176        };
177
178        let timestamp = chrono::Utc::now().to_rfc3339();
179        // Issue #45.2 — contextual auditing: augment payload with caller_id
180        // (defaulted to "@external") and identity from the Context.
181        let event_data = augment_with_context_identity(
182            json!({
183                "key": key,
184                "old_value": redacted_old,
185                "new_value": redacted_new,
186                "reason": reason,
187            }),
188            ctx,
189        );
190
191        emit_event(
192            &self.emitter,
193            "apcore.config.updated",
194            "system.control.update_config",
195            &timestamp,
196            event_data,
197        )
198        .await;
199
200        if sensitive {
201            tracing::info!(key = %key, reason = %reason, "Config updated: old_value=*** new_value=***");
202        } else {
203            tracing::info!(
204                key = %key,
205                old_value = ?old_value,
206                new_value = ?value,
207                reason = %reason,
208                "Config updated"
209            );
210        }
211
212        let entry = build_audit_entry(
213            AuditAction::UpdateConfig,
214            "system.control.update_config",
215            ctx,
216            AuditChange {
217                before: redacted_old.clone(),
218                after: redacted_new.clone(),
219            },
220        );
221        record_audit(self.audit_store.as_ref(), entry).await;
222
223        Ok(json!({
224            "success": true,
225            "key": key,
226            "old_value": redacted_old,
227            "new_value": redacted_new,
228        }))
229    }
230}
231
232// ---------------------------------------------------------------------------
233// ReloadModule (F10) — single + bulk path_filter reload
234// ---------------------------------------------------------------------------
235
236/// Hot-reload a module via safe unregister (F10).
237///
238/// Full re-discovery is not supported in Rust (no dynamic loading); the
239/// module is unregistered and callers must re-register manually. The reload
240/// event is always emitted with `new_version` == `previous_version`.
241///
242/// When `path_filter` is supplied instead of `module_id`, every module ID
243/// matching the glob pattern is reloaded in dependency-topological order
244/// (leaves first). Supplying both inputs raises `MODULE_RELOAD_CONFLICT`.
245///
246/// When the input contains `reload_config: true`, the bound [`Config`] (if
247/// supplied via [`Self::with_config`]) is refreshed via
248/// [`Config::reload_from_disk`] and an `apcore.config.reloaded` event is
249/// emitted. Issue #45.5 — Rust cannot dynamically swap compiled module code
250/// (`.so`/`.rlib`), but static configuration MUST be reloadable without a
251/// binary restart.
252pub struct ReloadModule {
253    registry: Arc<Registry>,
254    emitter: Arc<Mutex<EventEmitter>>,
255    audit_store: Option<Arc<dyn AuditStore>>,
256    config: Option<Arc<Mutex<Config>>>,
257}
258
259impl ReloadModule {
260    pub fn new(registry: Arc<Registry>, emitter: Arc<Mutex<EventEmitter>>) -> Self {
261        Self {
262            registry,
263            emitter,
264            audit_store: None,
265            config: None,
266        }
267    }
268
269    #[must_use]
270    pub fn with_audit_store(mut self, audit_store: Option<Arc<dyn AuditStore>>) -> Self {
271        self.audit_store = audit_store;
272        self
273    }
274
275    /// Bind a runtime [`Config`] so `reload_config: true` invocations can
276    /// refresh static configuration via [`Config::reload_from_disk`].
277    /// Issue #45.5.
278    #[must_use]
279    pub fn with_config(mut self, config: Option<Arc<Mutex<Config>>>) -> Self {
280        self.config = config;
281        self
282    }
283
284    /// Topologically sort the matched module IDs (leaves first). Falls back
285    /// to alphabetical order if the dependency graph contains a cycle or
286    /// references a missing module — the reload still happens, just without
287    /// the optimal order.
288    fn topo_sort_modules(&self, matched: &[String]) -> Vec<String> {
289        let matched_set: std::collections::HashSet<String> = matched.iter().cloned().collect();
290        let entries: Vec<(String, Vec<DepInfo>)> = matched
291            .iter()
292            .map(|mid| {
293                let deps: Vec<DepInfo> = self
294                    .registry
295                    .get_definition(mid)
296                    .ok()
297                    .flatten()
298                    .map(|d| {
299                        d.dependencies
300                            .into_iter()
301                            .filter(|dep| matched_set.contains(&dep.module_id))
302                            .map(|dep| DepInfo {
303                                module_id: dep.module_id,
304                                version: if dep.version_constraint.is_empty() {
305                                    None
306                                } else {
307                                    Some(dep.version_constraint)
308                                },
309                                optional: dep.optional,
310                            })
311                            .collect()
312                    })
313                    .unwrap_or_default();
314                (mid.clone(), deps)
315            })
316            .collect();
317
318        match resolve_dependencies(&entries, Some(&matched_set), None) {
319            Ok(order) => order,
320            Err(e) => {
321                tracing::warn!(
322                    error = %e,
323                    "Topological sort failed for path_filter reload; falling back to alphabetical"
324                );
325                let mut sorted = matched.to_vec();
326                sorted.sort();
327                sorted
328            }
329        }
330    }
331
332    #[allow(
333        clippy::too_many_lines,
334        clippy::single_match_else,
335        clippy::map_unwrap_or
336    )]
337    async fn execute_single(
338        &self,
339        module_id: String,
340        reason: &str,
341        ctx: &Context<serde_json::Value>,
342    ) -> Result<serde_json::Value, ModuleError> {
343        // Sync SM-006: implement the 8-step reload pipeline aligned with
344        // apcore-python (sys_modules/control.py:412-458) and apcore-typescript
345        // (sys-modules/control.ts:186-254).
346        //   1. capture_previous_version
347        //   2. on_suspend (best-effort)
348        //   3. safe_unregister
349        //   4. registry.discover_internal() (re-discover modules)
350        //   5. register_internal (no-op when discoverer reinstates)
351        //   6. on_resume (best-effort)
352        //   7. emit_reloaded event with actual previous + new versions
353        //   8. log
354        let start = std::time::Instant::now();
355
356        if !self.registry.has(&module_id) {
357            return Err(ModuleError::new(
358                ErrorCode::ModuleNotFound,
359                format!("Module '{module_id}' not found"),
360            ));
361        }
362
363        // (1) Capture previous version from the registry descriptor.
364        let previous_version = self
365            .registry
366            .get_definition(&module_id)
367            .ok()
368            .flatten()
369            .map(|d| d.version)
370            .unwrap_or_else(|| "unknown".to_string());
371
372        // (2) on_suspend (best-effort) — capture state for handoff to on_resume.
373        // Panics inside the user-supplied trait method are caught so a faulty
374        // hook cannot abort the reload.
375        let suspended_state = match self.registry.get(&module_id) {
376            Ok(Some(module)) => {
377                let module_for_panic = Arc::clone(&module);
378                match std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
379                    module_for_panic.on_suspend()
380                })) {
381                    Ok(state) => state,
382                    Err(_) => {
383                        tracing::warn!(
384                            module_id = %module_id,
385                            "Module on_suspend panicked; continuing reload"
386                        );
387                        None
388                    }
389                }
390            }
391            _ => None,
392        };
393
394        // (3) Safe unregister — drains in-flight calls.
395        self.registry.safe_unregister(&module_id, 5000).await?;
396
397        // (4) Re-run the configured discoverer to repopulate the registry.
398        // Best-effort: if no discoverer is attached (NoDiscovererConfigured)
399        // or discovery fails, log and continue — the SDK still emits the
400        // reload event so observers are notified.
401        match self.registry.discover_internal().await {
402            Ok(count) => tracing::debug!(
403                module_id = %module_id,
404                count,
405                "Reload: discover_internal repopulated registry"
406            ),
407            Err(e) => tracing::warn!(
408                module_id = %module_id,
409                error = %e.message,
410                "Reload: discover_internal returned error (best-effort, continuing)"
411            ),
412        }
413
414        // (5) register_internal: in Rust we don't carry stand-alone factory
415        // closures, so re-registration is delegated to the discoverer in step
416        // 4. This branch is intentionally a no-op for cross-language parity.
417
418        // (6) on_resume (best-effort) — handoff state to the freshly loaded module.
419        let new_version = self
420            .registry
421            .get_definition(&module_id)
422            .ok()
423            .flatten()
424            .map(|d| d.version)
425            .unwrap_or_else(|| previous_version.clone());
426
427        if let Some(state) = suspended_state {
428            if let Ok(Some(module)) = self.registry.get(&module_id) {
429                let module_for_panic = Arc::clone(&module);
430                if std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
431                    module_for_panic.on_resume(state);
432                }))
433                .is_err()
434                {
435                    tracing::warn!(
436                        module_id = %module_id,
437                        "Module on_resume panicked; reload still considered successful"
438                    );
439                }
440            }
441        }
442
443        // (7) Emit the reloaded event with actual versions.
444        let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
445        let timestamp = chrono::Utc::now().to_rfc3339();
446        // Issue #45.2 — augment with caller_id / identity from the Context.
447        // Include `module_id` in the data payload for cross-language parity
448        // (Python/TS both inline it so subscribers reading only `data` see
449        // the affected target).
450        let event_data = augment_with_context_identity(
451            json!({
452                "module_id": module_id,
453                "previous_version": previous_version,
454                "new_version": new_version,
455                "reason": reason,
456            }),
457            ctx,
458        );
459        emit_event(
460            &self.emitter,
461            "apcore.module.reloaded",
462            &module_id,
463            &timestamp,
464            event_data,
465        )
466        .await;
467
468        // (8) Structured log.
469        tracing::info!(
470            module_id = %module_id,
471            previous_version = %previous_version,
472            new_version = %new_version,
473            reason = %reason,
474            "Module reloaded"
475        );
476
477        let entry = build_audit_entry(
478            AuditAction::ReloadModule,
479            &module_id,
480            ctx,
481            AuditChange {
482                before: json!(previous_version),
483                after: json!(new_version),
484            },
485        );
486        record_audit(self.audit_store.as_ref(), entry).await;
487
488        Ok(json!({
489            "success": true,
490            "module_id": module_id,
491            "previous_version": previous_version,
492            "new_version": new_version,
493            "reload_duration_ms": elapsed_ms,
494        }))
495    }
496
497    async fn execute_bulk(
498        &self,
499        path_filter: String,
500        reason: &str,
501        ctx: &Context<serde_json::Value>,
502    ) -> Result<serde_json::Value, ModuleError> {
503        let pattern = Pattern::new(&path_filter).map_err(|e| {
504            ModuleError::new(
505                ErrorCode::GeneralInvalidInput,
506                format!("'path_filter' is not a valid glob pattern: {e}"),
507            )
508        })?;
509
510        let mut matched: Vec<String> = self
511            .registry
512            .module_ids()
513            .into_iter()
514            .filter(|id| pattern.matches(id))
515            .collect();
516        matched.sort();
517
518        let order = self.topo_sort_modules(&matched);
519        let start = std::time::Instant::now();
520
521        let mut reloaded: Vec<String> = Vec::new();
522        for mid in order {
523            if !self.registry.has(&mid) {
524                continue;
525            }
526            match self.registry.safe_unregister(&mid, 5000).await {
527                Ok(_) => {
528                    let timestamp = chrono::Utc::now().to_rfc3339();
529                    let event_data = augment_with_context_identity(
530                        json!({
531                            "previous_version": "unknown",
532                            "new_version": "unknown",
533                            "reason": reason,
534                        }),
535                        ctx,
536                    );
537                    emit_event(
538                        &self.emitter,
539                        "apcore.module.reloaded",
540                        &mid,
541                        &timestamp,
542                        event_data,
543                    )
544                    .await;
545                    reloaded.push(mid);
546                }
547                Err(e) => {
548                    tracing::error!(error = %e, module_id = %mid, "Bulk reload: failed to unregister");
549                }
550            }
551        }
552
553        let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
554        tracing::info!(
555            count = reloaded.len(),
556            path_filter = %path_filter,
557            reason = %reason,
558            "Bulk module reload"
559        );
560
561        let entry = build_audit_entry(
562            AuditAction::ReloadModule,
563            &path_filter,
564            ctx,
565            AuditChange {
566                before: serde_json::Value::Null,
567                after: json!(reloaded.clone()),
568            },
569        );
570        record_audit(self.audit_store.as_ref(), entry).await;
571
572        Ok(json!({
573            "success": true,
574            "module_id": serde_json::Value::Null,
575            "reloaded_modules": reloaded,
576            "reload_duration_ms": elapsed_ms,
577        }))
578    }
579}
580
581#[async_trait]
582impl Module for ReloadModule {
583    fn description(&self) -> &'static str {
584        "Hot-reload a module by safe unregister (re-registration must be done explicitly in Rust)"
585    }
586
587    fn input_schema(&self) -> serde_json::Value {
588        json!({
589            "type": "object",
590            "required": ["reason"],
591            "properties": {
592                "module_id":         {"type": "string"},
593                "path_filter":       {"type": "string"},
594                "reload_dependents": {"type": "boolean", "default": false},
595                "reload_config":     {"type": "boolean", "default": false},
596                "reason":            {"type": "string"}
597            }
598        })
599    }
600
601    fn output_schema(&self) -> serde_json::Value {
602        json!({
603            "type": "object",
604            "properties": {
605                "success":            {"type": "boolean"},
606                "module_id":          {"type": ["string", "null"]},
607                "previous_version":   {"type": "string"},
608                "new_version":        {"type": "string"},
609                "reload_duration_ms": {"type": "number"},
610                "reloaded_modules":   {"type": "array", "items": {"type": "string"}}
611            }
612        })
613    }
614
615    async fn execute(
616        &self,
617        inputs: serde_json::Value,
618        ctx: &Context<serde_json::Value>,
619    ) -> Result<serde_json::Value, ModuleError> {
620        let reason = require_string(&inputs, "reason")?;
621
622        let module_id_input = inputs
623            .get("module_id")
624            .filter(|v| !v.is_null())
625            .and_then(|v| v.as_str())
626            .filter(|s| !s.is_empty());
627        let path_filter_input = inputs
628            .get("path_filter")
629            .filter(|v| !v.is_null())
630            .and_then(|v| v.as_str())
631            .filter(|s| !s.is_empty());
632
633        if module_id_input.is_some() && path_filter_input.is_some() {
634            return Err(ModuleError::new(
635                ErrorCode::ModuleReloadConflict,
636                "'module_id' and 'path_filter' are mutually exclusive",
637            ));
638        }
639
640        let reload_config_flag = inputs
641            .get("reload_config")
642            .and_then(serde_json::Value::as_bool)
643            .unwrap_or(false);
644
645        // Issue #45.5: when reload_config is set, refresh static configuration
646        // from disk. This runs BEFORE module reload so any module that re-reads
647        // config during its on_resume sees the fresh values.
648        let mut config_reloaded = false;
649        if reload_config_flag {
650            if let Some(cfg_handle) = self.config.as_ref() {
651                let mut cfg = cfg_handle.lock().await;
652                match cfg.reload_from_disk() {
653                    Ok(()) => {
654                        config_reloaded = true;
655                        let timestamp = chrono::Utc::now().to_rfc3339();
656                        emit_event(
657                            &self.emitter,
658                            "apcore.config.reloaded",
659                            "system.control.reload_module",
660                            &timestamp,
661                            json!({"reason": reason}),
662                        )
663                        .await;
664                        tracing::info!(reason = %reason, "Config reloaded from disk");
665                    }
666                    Err(e) => {
667                        tracing::warn!(
668                            error = %e.message,
669                            "reload_config: Config::reload_from_disk failed (continuing)"
670                        );
671                    }
672                }
673            } else {
674                tracing::warn!(
675                    "reload_config: requested but no Config bound to ReloadModule \
676                     (use ReloadModule::with_config to enable)"
677                );
678            }
679        }
680
681        if let Some(filter) = path_filter_input {
682            let mut result = self.execute_bulk(filter.to_string(), &reason, ctx).await?;
683            if let Some(obj) = result.as_object_mut() {
684                obj.insert("config_reloaded".to_string(), json!(config_reloaded));
685            }
686            return Ok(result);
687        }
688
689        // If only reload_config was requested without a module target, return
690        // a config-only success response. This makes `reload_config: true`
691        // usable on its own without forcing the caller to nominate a module.
692        if module_id_input.is_none() && reload_config_flag {
693            return Ok(json!({
694                "success": true,
695                "module_id": serde_json::Value::Null,
696                "config_reloaded": config_reloaded,
697            }));
698        }
699
700        let module_id = module_id_input.ok_or_else(|| {
701            ModuleError::new(
702                ErrorCode::GeneralInvalidInput,
703                "'module_id', 'path_filter', or 'reload_config' is required",
704            )
705        })?;
706
707        let mut result = self
708            .execute_single(module_id.to_string(), &reason, ctx)
709            .await?;
710        if let Some(obj) = result.as_object_mut() {
711            obj.insert("config_reloaded".to_string(), json!(config_reloaded));
712        }
713        Ok(result)
714    }
715}
716
717// ---------------------------------------------------------------------------
718// ToggleFeatureModule (F19) — runtime enable/disable with optional persistence
719// ---------------------------------------------------------------------------
720
721/// Disable or enable a module without unloading it from the Registry (F19).
722pub struct ToggleFeatureModule {
723    registry: Arc<Registry>,
724    emitter: Arc<Mutex<EventEmitter>>,
725    toggle_state: Arc<ToggleState>,
726    overrides_path: Option<PathBuf>,
727    overrides_store: Option<Arc<dyn OverridesStore>>,
728    audit_store: Option<Arc<dyn AuditStore>>,
729}
730
731impl ToggleFeatureModule {
732    pub fn new(
733        registry: Arc<Registry>,
734        emitter: Arc<Mutex<EventEmitter>>,
735        toggle_state: Arc<ToggleState>,
736    ) -> Self {
737        Self {
738            registry,
739            emitter,
740            toggle_state,
741            overrides_path: None,
742            overrides_store: None,
743            audit_store: None,
744        }
745    }
746
747    #[must_use]
748    pub fn with_overrides_path(mut self, overrides_path: Option<PathBuf>) -> Self {
749        self.overrides_path = overrides_path;
750        self
751    }
752
753    /// Bind a pluggable [`OverridesStore`] for persistence. Takes precedence
754    /// over `overrides_path` when both are set.
755    #[must_use]
756    pub fn with_overrides_store(
757        mut self,
758        overrides_store: Option<Arc<dyn OverridesStore>>,
759    ) -> Self {
760        self.overrides_store = overrides_store;
761        self
762    }
763
764    #[must_use]
765    pub fn with_audit_store(mut self, audit_store: Option<Arc<dyn AuditStore>>) -> Self {
766        self.audit_store = audit_store;
767        self
768    }
769}
770
771#[async_trait]
772impl Module for ToggleFeatureModule {
773    fn description(&self) -> &'static str {
774        "Disable or enable a module without unloading it"
775    }
776
777    fn input_schema(&self) -> serde_json::Value {
778        json!({
779            "type": "object",
780            "required": ["module_id", "enabled", "reason"],
781            "properties": {
782                "module_id": {"type": "string"},
783                "enabled":   {"type": "boolean"},
784                "reason":    {"type": "string"}
785            }
786        })
787    }
788
789    fn output_schema(&self) -> serde_json::Value {
790        json!({
791            "type": "object",
792            "properties": {
793                "success":   {"type": "boolean"},
794                "module_id": {"type": "string"},
795                "enabled":   {"type": "boolean"}
796            }
797        })
798    }
799
800    async fn execute(
801        &self,
802        inputs: serde_json::Value,
803        ctx: &Context<serde_json::Value>,
804    ) -> Result<serde_json::Value, ModuleError> {
805        let module_id = require_string(&inputs, "module_id")?;
806        let reason = require_string(&inputs, "reason")?;
807        let enabled = inputs
808            .get("enabled")
809            .and_then(serde_json::Value::as_bool)
810            .ok_or_else(|| {
811                ModuleError::new(
812                    ErrorCode::GeneralInvalidInput,
813                    "'enabled' is required and must be a boolean",
814                )
815            })?;
816
817        if !self.registry.has(&module_id) {
818            return Err(ModuleError::new(
819                ErrorCode::ModuleNotFound,
820                format!("Module '{module_id}' not found"),
821            ));
822        }
823
824        let before_enabled = !self.toggle_state.is_disabled(&module_id);
825
826        // Flip the descriptor's `enabled` flag in the Registry first — that's
827        // the fallible operation. Only after it succeeds do we update the
828        // infallible `ToggleState`. This ordering guarantees the two stores
829        // cannot diverge on Registry rejection.
830        if enabled {
831            self.registry.enable(&module_id)?;
832            self.toggle_state.enable(&module_id);
833        } else {
834            self.registry.disable(&module_id)?;
835            self.toggle_state.disable(&module_id);
836        }
837
838        let toggle_key = format!("toggle.{module_id}");
839        let toggle_value = serde_json::Value::Bool(enabled);
840        if let Some(store) = self.overrides_store.as_ref() {
841            if let Err(e) = persist_one(store.as_ref(), &toggle_key, &toggle_value).await {
842                tracing::warn!(error = %e, key = %toggle_key, "OverridesStore persist failed");
843            }
844        } else if let Some(path) = self.overrides_path.as_deref() {
845            write_override(path, &toggle_key, &toggle_value);
846        }
847
848        let timestamp = chrono::Utc::now().to_rfc3339();
849        // Issue #45.2 — augment with caller_id / identity from the Context.
850        // Cross-language parity: Python/TS include `module_id` directly in the
851        // event data payload (alongside the outer ApCoreEvent.module_id field)
852        // so subscribers that only look at `data` still see the affected target.
853        let event_data = augment_with_context_identity(
854            json!({
855                "module_id": module_id,
856                "enabled": enabled,
857                "reason": reason,
858            }),
859            ctx,
860        );
861        emit_event(
862            &self.emitter,
863            "apcore.module.toggled",
864            &module_id,
865            &timestamp,
866            event_data,
867        )
868        .await;
869
870        tracing::info!(
871            module_id = %module_id,
872            enabled = %enabled,
873            reason = %reason,
874            "Module toggled"
875        );
876
877        let entry = build_audit_entry(
878            AuditAction::ToggleFeature,
879            &module_id,
880            ctx,
881            AuditChange {
882                before: json!(before_enabled),
883                after: json!(enabled),
884            },
885        );
886        record_audit(self.audit_store.as_ref(), entry).await;
887
888        Ok(json!({
889            "success": true,
890            "module_id": module_id,
891            "enabled": enabled,
892        }))
893    }
894}