Skip to main content

bijux_cli/features/plugins/
registry.rs

1#![forbid(unsafe_code)]
2
3use std::collections::BTreeSet;
4use std::fs;
5use std::io::Write as _;
6use std::path::{Path, PathBuf};
7use std::time::Duration;
8
9use sha2::{Digest, Sha256};
10
11use super::constants::REGISTRY_VERSION;
12use super::diagnostics::load_time_diagnostics;
13use super::entrypoint::{
14    installed_manifest_root, is_executable, resolve_delegated_entrypoint,
15    resolve_external_exec_entrypoint,
16};
17use super::errors::PluginError;
18use super::manifest::{is_version_compatible, parse_manifest_v2, validate_manifest};
19use super::models::{
20    InstallPluginRequest, PluginDoctorReport, PluginLoadEntry, PluginOriginMetadata, PluginRecord,
21    PluginRegistry,
22};
23use crate::api::version::runtime_semver;
24use crate::contracts::PluginKind;
25use crate::infrastructure::fs_store::atomic_write_text;
26
27fn checksum_sha256(input: &str) -> String {
28    let digest = Sha256::digest(input.as_bytes());
29    format!("{digest:x}")
30}
31
32/// Load plugin registry from disk.
33pub fn load_registry(path: &Path) -> Result<PluginRegistry, PluginError> {
34    if !path.exists() {
35        return Ok(PluginRegistry::default());
36    }
37
38    let text = fs::read_to_string(path)?;
39    let parsed: PluginRegistry =
40        serde_json::from_str(&text).map_err(|_| PluginError::RegistryCorrupted)?;
41    if parsed.version != REGISTRY_VERSION {
42        return Err(PluginError::RegistryCorrupted);
43    }
44    Ok(parsed)
45}
46
47/// Save plugin registry atomically.
48pub fn save_registry(path: &Path, registry: &PluginRegistry) -> Result<(), PluginError> {
49    if let Some(parent) = path.parent() {
50        fs::create_dir_all(parent)?;
51    }
52
53    let mut rendered = serde_json::to_string_pretty(registry)?;
54    rendered.push('\n');
55    atomic_write_text(path, &rendered)?;
56    Ok(())
57}
58
59fn backup_registry(path: &Path) -> Result<Option<PathBuf>, PluginError> {
60    if !path.exists() {
61        return Ok(None);
62    }
63    let backup = path.with_extension("bak");
64    fs::copy(path, &backup)?;
65    Ok(Some(backup))
66}
67
68#[derive(Debug)]
69struct RegistryLockGuard {
70    path: PathBuf,
71}
72
73impl Drop for RegistryLockGuard {
74    fn drop(&mut self) {
75        let _ = fs::remove_file(&self.path);
76    }
77}
78
79fn lock_path(path: &Path) -> PathBuf {
80    path.with_extension("lock")
81}
82
83fn stale_lock_timeout() -> Duration {
84    let seconds = std::env::var("BIJUX_PLUGIN_REGISTRY_LOCK_STALE_AFTER_SECONDS")
85        .ok()
86        .and_then(|raw| raw.parse::<u64>().ok())
87        .unwrap_or(300);
88    Duration::from_secs(seconds)
89}
90
91fn lock_owner_pid(lock: &Path) -> Option<u32> {
92    let content = fs::read_to_string(lock).ok()?;
93    content
94        .lines()
95        .find_map(|line| line.strip_prefix("pid="))
96        .and_then(|raw| raw.trim().parse::<u32>().ok())
97}
98
99#[cfg(unix)]
100fn process_is_alive(pid: u32) -> bool {
101    std::process::Command::new("kill")
102        .args(["-0", &pid.to_string()])
103        .stdout(std::process::Stdio::null())
104        .stderr(std::process::Stdio::null())
105        .status()
106        .is_ok_and(|status| status.success())
107}
108
109#[cfg(not(unix))]
110fn process_is_alive(_pid: u32) -> bool {
111    true
112}
113
114fn lock_is_stale(lock: &Path) -> bool {
115    if let Some(pid) = lock_owner_pid(lock) {
116        if !process_is_alive(pid) {
117            return true;
118        }
119    }
120
121    let Ok(metadata) = fs::metadata(lock) else {
122        return false;
123    };
124    let Ok(modified) = metadata.modified() else {
125        return false;
126    };
127    modified.elapsed().is_ok_and(|elapsed| elapsed > stale_lock_timeout())
128}
129
130fn acquire_registry_lock(path: &Path) -> Result<RegistryLockGuard, PluginError> {
131    if let Some(parent) = path.parent() {
132        fs::create_dir_all(parent)?;
133    }
134    let lock = lock_path(path);
135    for attempt in 0..2 {
136        match fs::OpenOptions::new().create_new(true).write(true).open(&lock) {
137            Ok(mut file) => {
138                let _ = writeln!(file, "pid={}", std::process::id());
139                let _ = file.sync_all();
140                return Ok(RegistryLockGuard { path: lock });
141            }
142            Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists && attempt == 0 => {
143                if lock_is_stale(&lock) {
144                    match fs::remove_file(&lock) {
145                        Ok(()) => continue,
146                        Err(remove_error)
147                            if remove_error.kind() == std::io::ErrorKind::NotFound =>
148                        {
149                            continue
150                        }
151                        Err(remove_error) => return Err(remove_error.into()),
152                    }
153                }
154                return Err(PluginError::RegistryLocked(lock));
155            }
156            Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {
157                return Err(PluginError::RegistryLocked(lock));
158            }
159            Err(error) => return Err(error.into()),
160        }
161    }
162    Err(PluginError::RegistryLocked(lock))
163}
164
165fn restore_registry(path: &Path, backup: Option<PathBuf>) -> Result<(), PluginError> {
166    if let Some(backup_path) = backup {
167        replace_file(&backup_path, path)?;
168    }
169    Ok(())
170}
171
172fn cleanup_backup(backup: Option<PathBuf>) {
173    if let Some(path) = backup {
174        let _ = fs::remove_file(path);
175    }
176}
177
178fn replace_file(source: &Path, destination: &Path) -> Result<(), std::io::Error> {
179    #[cfg(windows)]
180    {
181        if destination.exists() {
182            match fs::remove_file(destination) {
183                Ok(()) => {}
184                Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
185                Err(error) => return Err(error),
186            }
187        }
188    }
189
190    fs::rename(source, destination)
191}
192
193/// Update plugin registry atomically with rollback support.
194pub fn update_registry<F>(path: &Path, mutator: F) -> Result<PluginRegistry, PluginError>
195where
196    F: FnOnce(&mut PluginRegistry) -> Result<(), PluginError>,
197{
198    let _lock = acquire_registry_lock(path)?;
199    let backup = backup_registry(path)?;
200    let mut registry = load_registry(path)?;
201
202    if let Err(error) = mutator(&mut registry) {
203        restore_registry(path, backup)?;
204        return Err(error);
205    }
206
207    if let Err(error) = save_registry(path, &registry) {
208        restore_registry(path, backup)?;
209        return Err(error);
210    }
211
212    cleanup_backup(backup);
213    Ok(registry)
214}
215
216fn ensure_aliases_do_not_conflict(
217    registry: &PluginRegistry,
218    candidate: &PluginRecord,
219) -> Result<(), PluginError> {
220    let mut existing_aliases = BTreeSet::new();
221    let mut existing_namespaces = BTreeSet::new();
222    for plugin in registry.plugins.values() {
223        existing_namespaces.insert(plugin.manifest.namespace.0.to_ascii_lowercase());
224        for alias in &plugin.manifest.aliases {
225            existing_aliases.insert(alias.to_ascii_lowercase());
226        }
227    }
228
229    if existing_aliases.contains(&candidate.manifest.namespace.0.to_ascii_lowercase()) {
230        return Err(PluginError::AliasConflict(candidate.manifest.namespace.0.clone()));
231    }
232
233    for alias in &candidate.manifest.aliases {
234        let normalized = alias.to_ascii_lowercase();
235        if existing_aliases.contains(&normalized) || existing_namespaces.contains(&normalized) {
236            return Err(PluginError::AliasConflict(alias.clone()));
237        }
238    }
239
240    Ok(())
241}
242
243fn resolve_namespace_reference(
244    registry: &PluginRegistry,
245    reference: &str,
246) -> Result<String, PluginError> {
247    let normalized = reference.to_ascii_lowercase();
248    if let Some((namespace, _)) =
249        registry.plugins.iter().find(|(namespace, _)| namespace.to_ascii_lowercase() == normalized)
250    {
251        return Ok(namespace.clone());
252    }
253
254    registry
255        .plugins
256        .iter()
257        .find(|(_, record)| {
258            record.manifest.aliases.iter().any(|alias| alias.to_ascii_lowercase() == normalized)
259        })
260        .map(|(namespace, _)| namespace.clone())
261        .ok_or_else(|| PluginError::PluginNotFound(reference.to_string()))
262}
263
264fn validate_local_entrypoint(record: &PluginRecord) -> Result<(), PluginError> {
265    match record.manifest.kind {
266        PluginKind::Delegated | PluginKind::Python => {
267            if let Some(manifest_root) =
268                installed_manifest_root(record.manifest_path.as_deref(), &record.source)
269            {
270                let candidates = super::entrypoint::delegated_entrypoint_candidates(
271                    &manifest_root,
272                    &record.manifest.entrypoint,
273                );
274                let resolved = resolve_delegated_entrypoint(
275                    record.manifest_path.as_deref(),
276                    &record.source,
277                    &record.manifest.entrypoint,
278                );
279                if resolved.is_none() {
280                    if let Some(path) = candidates.into_iter().next() {
281                        return Err(PluginError::MissingEntrypointPath {
282                            kind: record.manifest.kind,
283                            path,
284                        });
285                    }
286                }
287            }
288        }
289        PluginKind::ExternalExec => {
290            let entrypoint_path = Path::new(&record.manifest.entrypoint);
291            if installed_manifest_root(record.manifest_path.as_deref(), &record.source).is_some()
292                || entrypoint_path.is_absolute()
293            {
294                let path = resolve_external_exec_entrypoint(
295                    record.manifest_path.as_deref(),
296                    &record.source,
297                    &record.manifest.entrypoint,
298                );
299                if !path.exists() {
300                    return Err(PluginError::MissingEntrypointPath {
301                        kind: record.manifest.kind,
302                        path,
303                    });
304                }
305                if !is_executable(&path)? {
306                    return Err(PluginError::NonExecutableEntrypoint { path });
307                }
308            }
309        }
310        PluginKind::Native => {}
311    }
312
313    Ok(())
314}
315
316/// Install plugin into registry from manifest text.
317pub fn install_plugin(
318    registry_path: &Path,
319    request: InstallPluginRequest,
320    host_version: &str,
321    reserved_namespaces: &[&str],
322) -> Result<PluginRecord, PluginError> {
323    let manifest_checksum_sha256 = checksum_sha256(&request.manifest_text);
324    let manifest = parse_manifest_v2(&request.manifest_text)?;
325    let validated = validate_manifest(manifest, host_version, reserved_namespaces)?;
326
327    let namespace = validated.manifest.namespace.0.clone();
328    let source = request.source;
329    let trust_level = request.trust_level;
330    let record = PluginRecord {
331        manifest: validated.manifest,
332        state: crate::contracts::PluginLifecycleState::Installed,
333        source,
334        manifest_path: request.manifest_path,
335        trust_level,
336        manifest_checksum_sha256,
337    };
338    validate_local_entrypoint(&record)?;
339
340    update_registry(registry_path, |registry| {
341        if registry.plugins.contains_key(&namespace) {
342            return Err(PluginError::NamespaceConflict(namespace.clone()));
343        }
344        ensure_aliases_do_not_conflict(registry, &record)?;
345        registry.plugins.insert(namespace.clone(), record.clone());
346        Ok(())
347    })?;
348
349    Ok(record)
350}
351
352/// Remove plugin from registry.
353pub fn uninstall_plugin(registry_path: &Path, namespace: &str) -> Result<(), PluginError> {
354    update_registry(registry_path, |registry| {
355        let resolved = resolve_namespace_reference(registry, namespace)?;
356        if registry.plugins.remove(&resolved).is_none() {
357            return Err(PluginError::PluginNotFound(namespace.to_string()));
358        }
359        Ok(())
360    })?;
361    Ok(())
362}
363
364fn set_plugin_state(
365    registry_path: &Path,
366    namespace: &str,
367    state: crate::contracts::PluginLifecycleState,
368) -> Result<PluginRecord, PluginError> {
369    let mut updated: Option<PluginRecord> = None;
370
371    update_registry(registry_path, |registry| {
372        let resolved = resolve_namespace_reference(registry, namespace)?;
373        let plugin = registry
374            .plugins
375            .get_mut(&resolved)
376            .ok_or_else(|| PluginError::PluginNotFound(namespace.to_string()))?;
377        if state == crate::contracts::PluginLifecycleState::Enabled
378            && plugin.state == crate::contracts::PluginLifecycleState::Broken
379        {
380            return Err(PluginError::InvalidField("cannot enable broken plugin".to_string()));
381        }
382        plugin.state = state;
383        updated = Some(plugin.clone());
384        Ok(())
385    })?;
386
387    updated.ok_or_else(|| PluginError::PluginNotFound(namespace.to_string()))
388}
389
390/// Enable installed plugin.
391pub fn enable_plugin(registry_path: &Path, namespace: &str) -> Result<PluginRecord, PluginError> {
392    set_plugin_state(registry_path, namespace, crate::contracts::PluginLifecycleState::Enabled)
393}
394
395/// Disable installed plugin.
396pub fn disable_plugin(registry_path: &Path, namespace: &str) -> Result<PluginRecord, PluginError> {
397    set_plugin_state(registry_path, namespace, crate::contracts::PluginLifecycleState::Disabled)
398}
399
400/// Inspect plugin by namespace.
401pub fn inspect_plugin(registry_path: &Path, namespace: &str) -> Result<PluginRecord, PluginError> {
402    let registry = load_registry(registry_path)?;
403    let resolved = resolve_namespace_reference(&registry, namespace)?;
404    registry
405        .plugins
406        .get(&resolved)
407        .cloned()
408        .ok_or_else(|| PluginError::PluginNotFound(namespace.to_string()))
409}
410
411/// Build plugin-origin metadata from registry contents.
412pub fn plugin_origin_metadata(
413    registry_path: &Path,
414) -> Result<Vec<PluginOriginMetadata>, PluginError> {
415    let registry = load_registry(registry_path)?;
416    Ok(registry
417        .plugins
418        .into_iter()
419        .map(|(namespace, record)| PluginOriginMetadata {
420            namespace,
421            source: record.source,
422            trust_level: record.trust_level,
423        })
424        .collect())
425}
426
427/// List all plugins deterministically by namespace.
428pub fn list_plugins(registry_path: &Path) -> Result<Vec<PluginRecord>, PluginError> {
429    let registry = load_registry(registry_path)?;
430    Ok(registry.plugins.into_values().collect())
431}
432
433/// Produce plugin health report.
434pub fn plugin_doctor(registry_path: &Path) -> Result<PluginDoctorReport, PluginError> {
435    let registry = load_registry(registry_path)?;
436    let diagnostics = load_time_diagnostics(registry_path, runtime_semver())?;
437    let mut broken = diagnostics
438        .iter()
439        .filter(|diagnostic| diagnostic.severity == "error")
440        .map(|diagnostic| diagnostic.namespace.clone())
441        .collect::<Vec<_>>();
442    let mut incompatible = diagnostics
443        .iter()
444        .filter(|diagnostic| diagnostic.severity == "warning")
445        .map(|diagnostic| diagnostic.namespace.clone())
446        .collect::<Vec<_>>();
447    broken.sort();
448    broken.dedup();
449    incompatible.sort();
450    incompatible.dedup();
451
452    Ok(PluginDoctorReport { installed: registry.plugins.len(), broken, incompatible })
453}
454
455/// Check plugin compatibility against host version without mutating registry.
456#[allow(dead_code)]
457pub fn compatibility_check(
458    manifest: &crate::contracts::PluginManifestV2,
459    host_version: &str,
460) -> Result<bool, PluginError> {
461    let _ = semver::VersionReq::parse(&format!("={host_version}"))
462        .map_err(|_| PluginError::InvalidField("host_version".to_string()))?;
463    is_version_compatible(&manifest.compatibility, host_version)
464}
465
466/// Return deterministic plugin load order contract.
467#[allow(dead_code)]
468pub fn plugin_load_order(registry_path: &Path) -> Result<Vec<PluginLoadEntry>, PluginError> {
469    let registry = load_registry(registry_path)?;
470    let mut items: Vec<PluginLoadEntry> = registry
471        .plugins
472        .iter()
473        .map(|(namespace, record)| PluginLoadEntry {
474            namespace: namespace.clone(),
475            state: record.state,
476        })
477        .collect();
478
479    items.sort_by(|left, right| {
480        let left_rank = state_rank(left.state);
481        let right_rank = state_rank(right.state);
482        left_rank.cmp(&right_rank).then_with(|| left.namespace.cmp(&right.namespace))
483    });
484
485    Ok(items)
486}
487
488#[allow(dead_code)]
489fn state_rank(state: crate::contracts::PluginLifecycleState) -> u8 {
490    match state {
491        crate::contracts::PluginLifecycleState::Enabled => 0,
492        crate::contracts::PluginLifecycleState::Installed
493        | crate::contracts::PluginLifecycleState::Validated => 1,
494        crate::contracts::PluginLifecycleState::Disabled => 2,
495        crate::contracts::PluginLifecycleState::Discovered => 3,
496        crate::contracts::PluginLifecycleState::Incompatible => 4,
497        crate::contracts::PluginLifecycleState::Broken => 5,
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use std::fs;
504    use std::sync::Arc;
505    use std::thread;
506
507    use tempfile::TempDir;
508
509    use super::{
510        load_registry, lock_path, save_registry, update_registry, PluginError, PluginRegistry,
511    };
512
513    #[test]
514    fn concurrent_registry_writes_keep_registry_parseable() {
515        let temp = TempDir::new().expect("tempdir");
516        let path = Arc::new(temp.path().join("registry.json"));
517
518        let mut writers = Vec::new();
519        for _ in 0..8 {
520            let path = Arc::clone(&path);
521            writers.push(thread::spawn(move || {
522                for _ in 0..40 {
523                    save_registry(path.as_path(), &PluginRegistry::default())
524                        .expect("save registry");
525                }
526            }));
527        }
528
529        for writer in writers {
530            writer.join().expect("join writer");
531        }
532
533        let loaded = load_registry(path.as_path()).expect("load registry");
534        assert_eq!(loaded, PluginRegistry::default());
535    }
536
537    #[test]
538    fn update_registry_rejects_when_lock_is_held() {
539        let temp = TempDir::new().expect("tempdir");
540        let path = temp.path().join("registry.json");
541        save_registry(path.as_path(), &PluginRegistry::default()).expect("seed");
542
543        let lock = lock_path(path.as_path());
544        fs::write(&lock, "held\n").expect("seed lock");
545
546        let err = update_registry(path.as_path(), |_| Ok(())).expect_err("lock should block write");
547        assert!(matches!(err, PluginError::RegistryLocked(_)));
548    }
549
550    #[test]
551    #[cfg(unix)]
552    fn update_registry_recovers_from_stale_dead_pid_lock() {
553        let temp = TempDir::new().expect("tempdir");
554        let path = temp.path().join("registry.json");
555        save_registry(path.as_path(), &PluginRegistry::default()).expect("seed");
556
557        let lock = lock_path(path.as_path());
558        fs::write(&lock, "pid=999999\n").expect("seed stale lock");
559
560        update_registry(path.as_path(), |_| Ok(())).expect("stale lock should be reclaimed");
561        assert!(!lock.exists(), "stale lock should be removed after successful update");
562    }
563}