Skip to main content

harmont_cli/plugin/
host_fns.rs

1//! All host functions exported to plugins. The exhaustive list lives
2//! in the design spec §3.3; this file is the single source of truth
3//! for which fn names exist and what types they accept.
4
5// `extism::host_fn!` expands to plain `pub fn` items whose bodies do
6// `plugin.memory_get_val(&inputs[i])` for each arg. The macro produces
7// expressions clippy wants to grumble about (needless pass-by-value of
8// `Json<T>` newtypes; non-erroring `Ok(())` wrappers); we accept the
9// macro idiom rather than fight it at every call site. Scope is this file.
10#![allow(clippy::needless_pass_by_value)]
11#![allow(clippy::unnecessary_wraps)]
12#![allow(clippy::wildcard_imports)]
13#![allow(clippy::missing_errors_doc)]
14// extism wraps every host-fn arg/ret through `MemoryHandle`, which is a
15// 64-bit pointer; cast-precision warnings are not actionable here.
16#![allow(clippy::cast_possible_truncation)]
17#![allow(clippy::cast_sign_loss)]
18// `all()` is intentionally a single Vec literal — splitting it would obscure
19// the 1:1 mapping between HOST_FN_NAMES and the constructed Function set.
20#![allow(clippy::too_many_lines)]
21// The tiny `pty` helper sits adjacent to its only call site inside `all()`;
22// hoisting it to module scope would force readers to jump out of the table.
23#![allow(clippy::items_after_statements)]
24// Several `*_impl` fns are no-op stubs that could be `const fn` today
25// but will gain side-effecting bodies in Plan 2; flipping them now would
26// mean another churn pass.
27#![allow(clippy::missing_const_for_fn)]
28// `GLOBAL.lock().map(|s| s.cancel).unwrap_or(false)` reads as
29// "treat host-fn failure as 'not cancelled'"; collapsing to `is_ok_and`
30// would obscure the fallback intent.
31#![allow(clippy::map_unwrap_or)]
32// `Lazy::new(|| …)` is the once_cell idiom we use across the workspace;
33// the `LazyLock` migration is a separate sweep.
34#![allow(clippy::incompatible_msrv)]
35#![allow(clippy::non_std_lazy_statics)]
36// `GLOBAL.lock()` returns a guard with significant `Drop`; clippy flags
37// holding it across the `.get(key).cloned()` call. The lock IS the
38// scrutinee on purpose — we want a coherent read.
39#![allow(clippy::significant_drop_in_scrutinee)]
40#![allow(clippy::significant_drop_tightening)]
41
42use std::collections::{BTreeMap, HashMap};
43use std::sync::{Arc, Mutex};
44
45use extism::convert::Json;
46use extism::{Function, PTR, UserData, ValType, host_fn};
47use hm_plugin_protocol::host_abi::{
48    ArchiveReadArgs, CallbackData, KeyringArgs, KeyringSetArgs, KvScope, Level, LoopbackHandle,
49    LoopbackRecvArgs, SocketHandle, SocketReadArgs, SocketWriteArgs, TtyConfirmArgs, TtyPromptArgs,
50};
51use hm_plugin_protocol::{
52    ArchiveId, BuildEvent, DockerCommitArgs, DockerExecArgs, DockerExtractArgs, DockerStartArgs,
53    StdStream,
54};
55use once_cell::sync::Lazy;
56
57/// The canonical list of host fns we expose. Plugin manifests are
58/// validated against this set at load time.
59pub const HOST_FN_NAMES: &[&str] = &[
60    "hm_log",
61    "hm_emit_step_log",
62    "hm_emit_event",
63    "hm_kv_get",
64    "hm_kv_set",
65    "hm_archive_read",
66    "hm_archive_total_size",
67    "hm_fs_read_config",
68    "hm_unix_socket_connect",
69    "hm_socket_write",
70    "hm_socket_read",
71    "hm_socket_close",
72    "hm_keyring_get",
73    "hm_keyring_set",
74    "hm_keyring_delete",
75    "hm_tty_prompt",
76    "hm_tty_confirm",
77    "hm_browser_open",
78    "hm_spawn_loopback",
79    "hm_loopback_recv",
80    "hm_should_cancel",
81    "hm_docker_ping",
82    "hm_docker_image_exists",
83    "hm_docker_pull",
84    "hm_docker_start_container",
85    "hm_docker_extract_workspace",
86    "hm_docker_exec",
87    "hm_docker_commit",
88    "hm_docker_remove_image",
89    "hm_docker_stop_remove",
90    "hm_write_stdout",
91    "hm_write_stderr",
92];
93
94// ─── host_fn! declarations ──────────────────────────────────────────────────
95//
96// Each `host_fn!` invocation expands to a plain `pub fn name(...)` matching
97// extism's host-fn signature. We wire each into a `Function` value below.
98
99host_fn!(pub _hm_log(_user_data: (); level: Json<Level>, msg: String) {
100    let Json(level) = level;
101    log_impl(level, &msg);
102    Ok(())
103});
104
105host_fn!(pub _hm_emit_step_log(_user_data: (); stream: Json<StdStream>, bytes: Vec<u8>) {
106    let Json(stream) = stream;
107    emit_step_log_impl(stream, &bytes);
108    Ok(())
109});
110
111host_fn!(pub _hm_emit_event(_user_data: (); event: Json<BuildEvent>) {
112    let Json(event) = event;
113    emit_event_impl(event);
114    Ok(())
115});
116
117host_fn!(pub _hm_kv_get(_user_data: (); scope: Json<KvScope>, key: String) -> Json<Option<Vec<u8>>> {
118    let Json(scope) = scope;
119    Ok(Json(kv_get_impl(scope, &key)))
120});
121
122host_fn!(pub _hm_kv_set(_user_data: (); scope: Json<KvScope>, key: String, val: Vec<u8>) {
123    let Json(scope) = scope;
124    kv_set_impl(scope, &key, val);
125    Ok(())
126});
127
128host_fn!(pub _hm_archive_read(_user_data: (); args: Json<ArchiveReadArgs>) -> Vec<u8> {
129    let Json(args) = args;
130    Ok(archive_read_impl(args))
131});
132
133host_fn!(pub _hm_archive_total_size(_user_data: (); id: Json<ArchiveId>) -> u64 {
134    let Json(id) = id;
135    Ok(archive_total_size_impl(id))
136});
137
138host_fn!(pub _hm_fs_read_config(_user_data: (); rel_path: String) -> Json<Option<Vec<u8>>> {
139    Ok(Json(fs_read_config_impl(&rel_path)))
140});
141
142host_fn!(pub _hm_unix_socket_connect(_user_data: (); path: String) -> Json<SocketHandle> {
143    Ok(Json(unix_socket_connect_impl(&path)))
144});
145
146host_fn!(pub _hm_socket_write(_user_data: (); args: Json<SocketWriteArgs>) -> u64 {
147    let Json(args) = args;
148    Ok(socket_write_impl(args))
149});
150
151host_fn!(pub _hm_socket_read(_user_data: (); args: Json<SocketReadArgs>) -> Vec<u8> {
152    let Json(args) = args;
153    Ok(socket_read_impl(args))
154});
155
156host_fn!(pub _hm_socket_close(_user_data: (); h: Json<SocketHandle>) {
157    let Json(h) = h;
158    socket_close_impl(h);
159    Ok(())
160});
161
162host_fn!(pub _hm_keyring_get(_user_data: (); args: Json<KeyringArgs>) -> Json<Option<String>> {
163    let Json(args) = args;
164    Ok(Json(keyring_get_impl(&args.service, &args.account)))
165});
166
167host_fn!(pub _hm_keyring_set(_user_data: (); args: Json<KeyringSetArgs>) {
168    let Json(args) = args;
169    keyring_set_impl(&args.service, &args.account, &args.secret);
170    Ok(())
171});
172
173host_fn!(pub _hm_keyring_delete(_user_data: (); args: Json<KeyringArgs>) {
174    let Json(args) = args;
175    keyring_delete_impl(&args.service, &args.account);
176    Ok(())
177});
178
179host_fn!(pub _hm_tty_prompt(_user_data: (); args: Json<TtyPromptArgs>) -> String {
180    let Json(args) = args;
181    Ok(tty_prompt_impl(&args.msg, args.mask))
182});
183
184host_fn!(pub _hm_tty_confirm(_user_data: (); args: Json<TtyConfirmArgs>) -> u32 {
185    let Json(args) = args;
186    Ok(u32::from(tty_confirm_impl(&args.msg, args.default)))
187});
188
189host_fn!(pub _hm_browser_open(_user_data: (); url: String) -> u32 {
190    Ok(u32::from(browser_open_impl(&url)))
191});
192
193host_fn!(pub _hm_spawn_loopback(_user_data: (); port: Json<Option<u16>>) -> Json<LoopbackHandle> {
194    let Json(port) = port;
195    let handle = tokio::task::block_in_place(|| {
196        tokio::runtime::Handle::current().block_on(spawn_loopback_impl_async(port))
197    })?;
198    Ok(Json(handle))
199});
200
201host_fn!(pub _hm_loopback_recv(_user_data: (); args: Json<LoopbackRecvArgs>) -> Json<Option<CallbackData>> {
202    let Json(args) = args;
203    let data = tokio::task::block_in_place(|| {
204        tokio::runtime::Handle::current().block_on(loopback_recv_impl_async(args))
205    });
206    Ok(Json(data))
207});
208
209host_fn!(pub _hm_should_cancel(_user_data: ();) -> u32 {
210    Ok(u32::from(should_cancel_impl()))
211});
212
213host_fn!(pub _hm_docker_ping(_user_data: ();) -> u32 {
214    let ok = tokio::task::block_in_place(|| {
215        tokio::runtime::Handle::current()
216            .block_on(crate::orchestrator::docker_host_fns::ping_impl())
217    });
218    Ok(u32::from(ok))
219});
220
221host_fn!(pub _hm_docker_image_exists(_user_data: (); tag: String) -> u32 {
222    let exists = tokio::task::block_in_place(|| {
223        tokio::runtime::Handle::current()
224            .block_on(crate::orchestrator::docker_host_fns::image_exists_impl(tag))
225    });
226    Ok(u32::from(exists))
227});
228
229host_fn!(pub _hm_docker_pull(_user_data: (); tag: String) {
230    tokio::task::block_in_place(|| {
231        tokio::runtime::Handle::current()
232            .block_on(crate::orchestrator::docker_host_fns::pull_impl(tag))
233    })?;
234    Ok(())
235});
236
237host_fn!(pub _hm_docker_start_container(_user_data: (); args: Json<DockerStartArgs>) -> String {
238    let Json(args) = args;
239    let id = tokio::task::block_in_place(|| {
240        tokio::runtime::Handle::current()
241            .block_on(crate::orchestrator::docker_host_fns::start_container_impl(args))
242    })?;
243    Ok(id)
244});
245
246host_fn!(pub _hm_docker_extract_workspace(_user_data: (); args: Json<DockerExtractArgs>) {
247    let Json(args) = args;
248    tokio::task::block_in_place(|| {
249        tokio::runtime::Handle::current()
250            .block_on(crate::orchestrator::docker_host_fns::extract_workspace_impl(args))
251    })?;
252    Ok(())
253});
254
255host_fn!(pub _hm_docker_exec(_user_data: (); args: Json<DockerExecArgs>) -> i32 {
256    let Json(args) = args;
257    let rc = tokio::task::block_in_place(|| {
258        tokio::runtime::Handle::current()
259            .block_on(crate::orchestrator::docker_host_fns::exec_impl(args))
260    })?;
261    Ok(rc)
262});
263
264host_fn!(pub _hm_docker_commit(_user_data: (); args: Json<DockerCommitArgs>) -> String {
265    let Json(args) = args;
266    let tag = tokio::task::block_in_place(|| {
267        tokio::runtime::Handle::current()
268            .block_on(crate::orchestrator::docker_host_fns::commit_impl(args))
269    })?;
270    Ok(tag)
271});
272
273host_fn!(pub _hm_docker_remove_image(_user_data: (); tag: String) {
274    let _ = tokio::task::block_in_place(|| {
275        tokio::runtime::Handle::current()
276            .block_on(crate::orchestrator::docker_host_fns::remove_image_impl(tag))
277    });
278    Ok(())
279});
280
281host_fn!(pub _hm_docker_stop_remove(_user_data: (); container_id: String) {
282    tokio::task::block_in_place(|| {
283        tokio::runtime::Handle::current()
284            .block_on(crate::orchestrator::docker_host_fns::stop_remove_impl(container_id));
285    });
286    Ok(())
287});
288
289host_fn!(pub _hm_write_stdout(_user_data: (); bytes: Vec<u8>) {
290    write_stdout_impl(&bytes);
291    Ok(())
292});
293
294host_fn!(pub _hm_write_stderr(_user_data: (); bytes: Vec<u8>) {
295    write_stderr_impl(&bytes);
296    Ok(())
297});
298
299/// Returns the host function table passed into every `Plugin::new`.
300///
301/// extism wraps every host-fn argument and return value as a 64-bit
302/// memory handle (`PTR == ValType::I64`), regardless of the underlying
303/// Rust type. So every `params: …` and `returns: …` list below is just
304/// `[PTR; N]` where `N` is the arg/return arity.
305pub fn all() -> Vec<Function> {
306    let ud: UserData<()> = UserData::default();
307    fn pty(n: usize) -> Vec<ValType> {
308        (0..n).map(|_| PTR).collect()
309    }
310    vec![
311        Function::new("hm_log", pty(2), pty(0), ud.clone(), _hm_log),
312        Function::new(
313            "hm_emit_step_log",
314            pty(2),
315            pty(0),
316            ud.clone(),
317            _hm_emit_step_log,
318        ),
319        Function::new("hm_emit_event", pty(1), pty(0), ud.clone(), _hm_emit_event),
320        Function::new("hm_kv_get", pty(2), pty(1), ud.clone(), _hm_kv_get),
321        Function::new("hm_kv_set", pty(3), pty(0), ud.clone(), _hm_kv_set),
322        Function::new(
323            "hm_archive_read",
324            pty(1),
325            pty(1),
326            ud.clone(),
327            _hm_archive_read,
328        ),
329        Function::new(
330            "hm_archive_total_size",
331            pty(1),
332            pty(1),
333            ud.clone(),
334            _hm_archive_total_size,
335        ),
336        Function::new(
337            "hm_fs_read_config",
338            pty(1),
339            pty(1),
340            ud.clone(),
341            _hm_fs_read_config,
342        ),
343        Function::new(
344            "hm_unix_socket_connect",
345            pty(1),
346            pty(1),
347            ud.clone(),
348            _hm_unix_socket_connect,
349        ),
350        Function::new(
351            "hm_socket_write",
352            pty(1),
353            pty(1),
354            ud.clone(),
355            _hm_socket_write,
356        ),
357        Function::new(
358            "hm_socket_read",
359            pty(1),
360            pty(1),
361            ud.clone(),
362            _hm_socket_read,
363        ),
364        Function::new(
365            "hm_socket_close",
366            pty(1),
367            pty(0),
368            ud.clone(),
369            _hm_socket_close,
370        ),
371        Function::new(
372            "hm_keyring_get",
373            pty(1),
374            pty(1),
375            ud.clone(),
376            _hm_keyring_get,
377        ),
378        Function::new(
379            "hm_keyring_set",
380            pty(1),
381            pty(0),
382            ud.clone(),
383            _hm_keyring_set,
384        ),
385        Function::new(
386            "hm_keyring_delete",
387            pty(1),
388            pty(0),
389            ud.clone(),
390            _hm_keyring_delete,
391        ),
392        Function::new("hm_tty_prompt", pty(1), pty(1), ud.clone(), _hm_tty_prompt),
393        Function::new(
394            "hm_tty_confirm",
395            pty(1),
396            pty(1),
397            ud.clone(),
398            _hm_tty_confirm,
399        ),
400        Function::new(
401            "hm_browser_open",
402            pty(1),
403            pty(1),
404            ud.clone(),
405            _hm_browser_open,
406        ),
407        Function::new(
408            "hm_spawn_loopback",
409            pty(1),
410            pty(1),
411            ud.clone(),
412            _hm_spawn_loopback,
413        ),
414        Function::new(
415            "hm_loopback_recv",
416            pty(1),
417            pty(1),
418            ud.clone(),
419            _hm_loopback_recv,
420        ),
421        Function::new(
422            "hm_should_cancel",
423            pty(0),
424            pty(1),
425            ud.clone(),
426            _hm_should_cancel,
427        ),
428        Function::new(
429            "hm_docker_ping",
430            pty(0),
431            pty(1),
432            ud.clone(),
433            _hm_docker_ping,
434        ),
435        Function::new(
436            "hm_docker_image_exists",
437            pty(1),
438            pty(1),
439            ud.clone(),
440            _hm_docker_image_exists,
441        ),
442        Function::new(
443            "hm_docker_pull",
444            pty(1),
445            pty(0),
446            ud.clone(),
447            _hm_docker_pull,
448        ),
449        Function::new(
450            "hm_docker_start_container",
451            pty(1),
452            pty(1),
453            ud.clone(),
454            _hm_docker_start_container,
455        ),
456        Function::new(
457            "hm_docker_extract_workspace",
458            pty(1),
459            pty(0),
460            ud.clone(),
461            _hm_docker_extract_workspace,
462        ),
463        Function::new(
464            "hm_docker_exec",
465            pty(1),
466            pty(1),
467            ud.clone(),
468            _hm_docker_exec,
469        ),
470        Function::new(
471            "hm_docker_commit",
472            pty(1),
473            pty(1),
474            ud.clone(),
475            _hm_docker_commit,
476        ),
477        Function::new(
478            "hm_docker_remove_image",
479            pty(1),
480            pty(0),
481            ud.clone(),
482            _hm_docker_remove_image,
483        ),
484        Function::new(
485            "hm_docker_stop_remove",
486            pty(1),
487            pty(0),
488            ud,
489            _hm_docker_stop_remove,
490        ),
491        Function::new(
492            "hm_write_stdout",
493            [ValType::I64],
494            [],
495            UserData::default(),
496            _hm_write_stdout,
497        ),
498        Function::new(
499            "hm_write_stderr",
500            [ValType::I64],
501            [],
502            UserData::default(),
503            _hm_write_stderr,
504        ),
505    ]
506}
507
508// ─── Implementations (minimal, correct, lockable). ──────────────────────────
509// "Minimal" here means: the simple host-side behaviour that fixture
510// tests in Task 28 will exercise. Heavier behaviours (real cancellation
511// propagation, archive byte-streaming under load) get hardened in
512// later plans when real plugins drive them.
513
514static GLOBAL: Lazy<Mutex<HostState>> = Lazy::new(|| Mutex::new(HostState::default()));
515
516#[derive(Debug, Default)]
517struct HostState {
518    build_kv: BTreeMap<String, Vec<u8>>,
519    step_kv: BTreeMap<String, Vec<u8>>,
520    // `SocketHandle` only implements `Hash + Eq`, not `Ord`, so a
521    // `HashMap` is the right shape here.
522    sockets: HashMap<SocketHandle, Vec<u8>>,
523    next_socket: u64,
524    /// Live loopback listeners. Keyed by the bound port (also the
525    /// returned `LoopbackHandle.0`). The `Arc<LoopbackSlot>` is shared
526    /// between the axum task and `loopback_recv_impl_async`.
527    /// `LoopbackHandle` implements `Hash + Eq` but not `Ord`, so this
528    /// is a `HashMap` rather than `BTreeMap` (same shape as `sockets`).
529    loopback_slots: HashMap<LoopbackHandle, Arc<LoopbackSlot>>,
530}
531
532/// Per-handle state for an in-flight loopback listener.
533///
534/// `receiver` is `Some(_)` until the first `hm_loopback_recv` consumes
535/// it; subsequent calls with the same handle return `None`. `shutdown_token`
536/// is cancelled by the axum route closure after the first callback is
537/// captured, which causes `axum::serve(..).with_graceful_shutdown(..)` to
538/// return and the listener to close.
539#[derive(Debug)]
540struct LoopbackSlot {
541    receiver: tokio::sync::Mutex<Option<tokio::sync::oneshot::Receiver<CallbackData>>>,
542    #[allow(
543        dead_code,
544        reason = "held to keep the token alive; cancellation is driven by the route closure's clone"
545    )]
546    shutdown_token: tokio_util::sync::CancellationToken,
547}
548
549fn log_impl(level: Level, msg: &str) {
550    match level {
551        Level::Trace => tracing::trace!(target: "plugin", "{msg}"),
552        Level::Debug => tracing::debug!(target: "plugin", "{msg}"),
553        Level::Info => tracing::info!(target: "plugin", "{msg}"),
554        Level::Warn => tracing::warn!(target: "plugin", "{msg}"),
555        Level::Error => tracing::error!(target: "plugin", "{msg}"),
556    }
557}
558
559fn emit_step_log_impl(stream: StdStream, bytes: &[u8]) {
560    let Some(state) = crate::orchestrator::state::current() else {
561        return;
562    };
563    let Some(step_id) = current_step_id() else {
564        return;
565    };
566    let line = String::from_utf8_lossy(bytes).into_owned();
567    state.event_bus.emit(BuildEvent::StepLog {
568        step_id,
569        stream,
570        line,
571        ts: chrono::Utc::now(),
572    });
573}
574
575fn emit_event_impl(event: BuildEvent) {
576    if let Some(state) = crate::orchestrator::state::current() {
577        state.event_bus.emit(event);
578    }
579}
580
581fn kv_get_impl(scope: KvScope, key: &str) -> Option<Vec<u8>> {
582    match scope {
583        KvScope::Plugin => load_plugin_kv().get(key).cloned(),
584        KvScope::Build | KvScope::Step => {
585            let s = GLOBAL.lock().ok()?;
586            let m = match scope {
587                KvScope::Build => &s.build_kv,
588                KvScope::Step => &s.step_kv,
589                KvScope::Plugin => unreachable!(),
590            };
591            m.get(key).cloned()
592        }
593    }
594}
595
596#[doc(hidden)] // pub for integration tests; not stable API
597pub fn kv_set_impl(scope: KvScope, key: &str, val: Vec<u8>) {
598    match scope {
599        KvScope::Plugin => {
600            // Hold an exclusive advisory lock for the full read-modify-write
601            // window. Without this, concurrent writers each load the same map,
602            // insert into their own copy, and the second writer's atomic save
603            // clobbers the first writer's insert. See plugin_kv_concurrency.rs.
604            //
605            // If lock acquisition fails (no config dir, no current plugin
606            // name, fs error), we fall back to the prior unprotected write —
607            // better than dropping the value entirely. This matches the
608            // existing best-effort framing of save_plugin_kv.
609            let lock = lock_plugin_kv();
610            if lock.is_none() {
611                tracing::warn!(
612                    target: "plugin::kv",
613                    "plugin-scope KV lock acquisition failed; \
614                     falling back to unprotected write. Concurrent \
615                     writers may lose updates."
616                );
617            }
618            let mut kv = load_plugin_kv();
619            kv.insert(key.to_string(), val);
620            save_plugin_kv(&kv);
621            // `lock` drops here, releasing the file lock.
622        }
623        KvScope::Build | KvScope::Step => {
624            let Ok(mut s) = GLOBAL.lock() else { return };
625            let m = match scope {
626                KvScope::Build => &mut s.build_kv,
627                KvScope::Step => &mut s.step_kv,
628                KvScope::Plugin => unreachable!(),
629            };
630            m.insert(key.to_string(), val);
631        }
632    }
633}
634
635// ─── Disk-backed Plugin-scope KV ────────────────────────────────────────────
636//
637// `KvScope::Plugin` persists across hm invocations so plugins (e.g. the
638// cloud plugin) can stash the active org slug, last-seen tokens, etc.
639// Path: `<config_dir>/harmont/state/<plugin-name>.kv`. Per-plugin
640// isolation is enforced by the `CURRENT_PLUGIN_NAME` thread-local,
641// which `LoadedPlugin::call_capability` sets around every call.
642//
643// Concurrency: write operations (`KvScope::Plugin`) take an exclusive
644// advisory lock on a per-plugin `<plugin-name>.lock` sibling file via
645// `fs2::FileExt::lock_exclusive`. Readers do NOT lock —
646// `load_plugin_kv` is read-only and works against the atomically
647// written `.kv` file (tmp + rename in `save_plugin_kv`), so a reader
648// either sees the pre-write or post-write state, never a torn map.
649// Concurrent invocations of `hm` against the same plugin's KV
650// serialise on the `.lock` file; the held window is small (load +
651// insert + atomic write of a typically-small JSON map) so contention
652// is not a practical concern.
653
654fn plugin_state_path() -> Option<std::path::PathBuf> {
655    let dir = dirs::config_dir()?.join("harmont").join("state");
656    let plugin = current_plugin_name()?;
657    Some(dir.join(format!("{plugin}.kv")))
658}
659
660/// Acquire an exclusive advisory lock on `<config_dir>/harmont/state/<plugin>.lock`.
661///
662/// Returns `None` if `plugin_state_path()` couldn't resolve (no config
663/// dir or no current plugin name). The returned `File` releases the
664/// lock on drop — so the caller holds the lock for the lifetime of
665/// the binding.
666fn lock_plugin_kv() -> Option<std::fs::File> {
667    use fs2::FileExt;
668    let kv_path = plugin_state_path()?;
669    let lock_path = kv_path.with_extension("lock");
670    if let Some(parent) = lock_path.parent() {
671        let _ = std::fs::create_dir_all(parent);
672    }
673    let f = std::fs::OpenOptions::new()
674        .create(true)
675        .truncate(false)
676        .read(true)
677        .write(true)
678        .open(&lock_path)
679        .ok()?;
680    f.lock_exclusive().ok()?;
681    Some(f)
682}
683
684fn current_plugin_name() -> Option<String> {
685    CURRENT_PLUGIN_NAME.with(|c| c.borrow().clone())
686}
687
688thread_local! {
689    pub(crate) static CURRENT_PLUGIN_NAME: std::cell::RefCell<Option<String>> =
690        const { std::cell::RefCell::new(None) };
691}
692
693#[doc(hidden)] // pub for integration tests; not stable API
694pub fn set_current_plugin_name(name: String) {
695    CURRENT_PLUGIN_NAME.with(|c| *c.borrow_mut() = Some(name));
696}
697
698pub(crate) fn clear_current_plugin_name() {
699    CURRENT_PLUGIN_NAME.with(|c| *c.borrow_mut() = None);
700}
701
702#[doc(hidden)] // pub for integration tests; not stable API
703#[must_use]
704pub fn load_plugin_kv() -> BTreeMap<String, Vec<u8>> {
705    let Some(path) = plugin_state_path() else {
706        return BTreeMap::default();
707    };
708    let Ok(bytes) = std::fs::read(&path) else {
709        return BTreeMap::default();
710    };
711    serde_json::from_slice(&bytes).unwrap_or_default()
712}
713
714fn save_plugin_kv(kv: &BTreeMap<String, Vec<u8>>) {
715    let Some(path) = plugin_state_path() else {
716        return;
717    };
718    let Some(parent) = path.parent() else { return };
719    let _ = std::fs::create_dir_all(parent);
720    if let Ok(bytes) = serde_json::to_vec(kv) {
721        // Atomic write: tmpfile + rename. If rename fails the old file
722        // persists; best-effort.
723        let tmp = path.with_extension("kv.tmp");
724        if std::fs::write(&tmp, &bytes).is_ok() {
725            let _ = std::fs::rename(&tmp, &path);
726        }
727    }
728}
729
730fn archive_read_impl(args: ArchiveReadArgs) -> Vec<u8> {
731    crate::orchestrator::state::current()
732        .map(|s| s.archives.read(args.id, args.offset, args.max))
733        .unwrap_or_default()
734}
735
736fn archive_total_size_impl(id: ArchiveId) -> u64 {
737    crate::orchestrator::state::current()
738        .map(|s| s.archives.total_size(id))
739        .unwrap_or(0)
740}
741
742fn fs_read_config_impl(rel_path: &str) -> Option<Vec<u8>> {
743    let root_unresolved = std::env::current_dir().ok()?.join(".harmont");
744    let root = root_unresolved.canonicalize().ok()?;
745    let candidate = root.join(rel_path);
746    let canonical = candidate.canonicalize().ok()?;
747    if !canonical.starts_with(&root) {
748        return None;
749    }
750    std::fs::read(canonical).ok()
751}
752
753fn unix_socket_connect_impl(_path: &str) -> SocketHandle {
754    let Ok(mut s) = GLOBAL.lock() else {
755        return SocketHandle(0);
756    };
757    s.next_socket += 1;
758    let h = SocketHandle(s.next_socket);
759    s.sockets.insert(h, Vec::new());
760    h
761}
762
763fn socket_write_impl(args: SocketWriteArgs) -> u64 {
764    let Ok(mut s) = GLOBAL.lock() else { return 0 };
765    if let Some(buf) = s.sockets.get_mut(&args.h) {
766        buf.extend_from_slice(&args.bytes);
767        args.bytes.len() as u64
768    } else {
769        0
770    }
771}
772
773fn socket_read_impl(_args: SocketReadArgs) -> Vec<u8> {
774    // Plan 1: in-memory loopback for tests. Plan 2 swaps in a real
775    // tokio UnixStream.
776    Vec::new()
777}
778
779fn socket_close_impl(h: SocketHandle) {
780    let Ok(mut s) = GLOBAL.lock() else { return };
781    s.sockets.remove(&h);
782}
783
784fn keyring_get_impl(service: &str, account: &str) -> Option<String> {
785    crate::creds_store::get(service, account)
786}
787
788fn keyring_set_impl(service: &str, account: &str, secret: &str) {
789    crate::creds_store::set(service, account, secret);
790}
791
792fn keyring_delete_impl(service: &str, account: &str) {
793    crate::creds_store::delete(service, account);
794}
795
796fn tty_prompt_impl(msg: &str, mask: bool) -> String {
797    use dialoguer::{Input, Password};
798    if mask {
799        Password::new()
800            .with_prompt(msg)
801            .interact()
802            .unwrap_or_default()
803    } else {
804        Input::<String>::new()
805            .with_prompt(msg)
806            .interact_text()
807            .unwrap_or_default()
808    }
809}
810
811fn tty_confirm_impl(msg: &str, default: bool) -> bool {
812    use dialoguer::Confirm;
813    Confirm::new()
814        .with_prompt(msg)
815        .default(default)
816        .interact()
817        .unwrap_or(default)
818}
819
820fn browser_open_impl(url: &str) -> bool {
821    webbrowser::open(url).is_ok()
822}
823
824/// Bind a real axum oneshot on `127.0.0.1:<port>` (or any free port if
825/// `port` is `None`). The first request to ANY path captures the URI's
826/// `(path, query)` into a oneshot, then cancels the shutdown token so
827/// the listener exits. Returns the bound port as a `LoopbackHandle`.
828///
829/// The plugin uses `handle.0` both as the recv handle and as the port
830/// number to embed in its OAuth redirect URI (`http://127.0.0.1:<port>/cb`).
831async fn spawn_loopback_impl_async(port: Option<u16>) -> anyhow::Result<LoopbackHandle> {
832    use anyhow::Context;
833    use axum::Router;
834    use axum::routing::get;
835    use std::net::SocketAddr;
836
837    let addr = SocketAddr::from(([127, 0, 0, 1], port.unwrap_or(0)));
838    let listener = tokio::net::TcpListener::bind(addr)
839        .await
840        .with_context(|| format!("bind loopback on {addr}"))?;
841    let bound_port = listener.local_addr()?.port();
842
843    let (tx, rx) = tokio::sync::oneshot::channel::<CallbackData>();
844    // The sender is moved into the route closure, which uses `.take()`
845    // to ensure only the FIRST callback fires the channel. Wrapping in
846    // `Arc<Mutex<Option<_>>>` makes the closure `Clone` (axum needs the
847    // closure to be `FnOnce + Clone` for fallback handlers).
848    let tx_for_route: Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<CallbackData>>>> =
849        Arc::new(tokio::sync::Mutex::new(Some(tx)));
850    let shutdown = tokio_util::sync::CancellationToken::new();
851
852    let shutdown_for_route = shutdown.clone();
853    let app = Router::new().fallback(get(move |uri: axum::http::Uri| {
854        let tx = tx_for_route.clone();
855        let shutdown = shutdown_for_route.clone();
856        async move {
857            let path = uri.path().to_string();
858            let mut query: BTreeMap<String, String> = BTreeMap::new();
859            if let Some(q) = uri.query() {
860                for (k, v) in url::form_urlencoded::parse(q.as_bytes()) {
861                    query.insert(k.into_owned(), v.into_owned());
862                }
863            }
864            let data = CallbackData { path, query };
865            if let Some(t) = tx.lock().await.take() {
866                let _ = t.send(data);
867            }
868            shutdown.cancel();
869            axum::response::Html(
870                "<!DOCTYPE html><html><body><h1>You can close this tab.</h1></body></html>",
871            )
872        }
873    }));
874
875    let shutdown_for_server = shutdown.clone();
876    tokio::spawn(async move {
877        let _ = axum::serve(listener, app)
878            .with_graceful_shutdown(shutdown_for_server.cancelled_owned())
879            .await;
880    });
881
882    let handle = LoopbackHandle(u64::from(bound_port));
883    let slot = Arc::new(LoopbackSlot {
884        receiver: tokio::sync::Mutex::new(Some(rx)),
885        shutdown_token: shutdown,
886    });
887    {
888        let mut g = GLOBAL
889            .lock()
890            .map_err(|_| anyhow::anyhow!("global host state lock poisoned"))?;
891        g.loopback_slots.insert(handle, slot);
892    }
893    Ok(handle)
894}
895
896/// Await the matching slot's oneshot receiver for up to `timeout_ms`
897/// milliseconds. Returns `None` on timeout, on unknown handle, or if the
898/// receiver has already been consumed.
899async fn loopback_recv_impl_async(args: LoopbackRecvArgs) -> Option<CallbackData> {
900    let slot = {
901        let g = GLOBAL.lock().ok()?;
902        g.loopback_slots.get(&args.h).cloned()
903    }?;
904    // Hold the slot's async mutex only long enough to `.take()` the
905    // receiver — the actual await happens outside the lock so a second
906    // caller doesn't block while the first waits.
907    let rx_opt = {
908        let mut rx_guard = slot.receiver.lock().await;
909        rx_guard.take()
910    };
911    let rx = rx_opt?;
912    match tokio::time::timeout(
913        std::time::Duration::from_millis(u64::from(args.timeout_ms)),
914        rx,
915    )
916    .await
917    {
918        Ok(Ok(data)) => Some(data),
919        _ => None,
920    }
921}
922
923fn should_cancel_impl() -> bool {
924    crate::orchestrator::state::current()
925        .map(|s| s.cancel.is_cancelled())
926        .unwrap_or(false)
927}
928
929#[allow(
930    clippy::print_stdout,
931    reason = "this fn's purpose is user-facing stdout output"
932)]
933fn write_stdout_impl(bytes: &[u8]) {
934    use std::io::Write;
935    let mut out = std::io::stdout().lock();
936    // Best-effort: drop on error rather than panic. A broken stdout
937    // (e.g. SIGPIPE) is reported elsewhere by the parent process.
938    let _ = out.write_all(bytes);
939    let _ = out.flush();
940}
941
942#[allow(
943    clippy::print_stderr,
944    reason = "this fn's purpose is user-facing stderr output"
945)]
946fn write_stderr_impl(bytes: &[u8]) {
947    use std::io::Write;
948    let mut err = std::io::stderr().lock();
949    let _ = err.write_all(bytes);
950    let _ = err.flush();
951}
952
953// ─── Per-step thread-local context ─────────────────────────────────────────
954//
955// The scheduler sets `CURRENT_STEP_ID` around each
956// `call_capability("hm_executor_run", …)` invocation so host fns like
957// `emit_step_log` can tag emitted events with the right step. Outside an
958// orchestrator-driven run the cell stays `None`, and those host fns
959// short-circuit to a no-op.
960
961thread_local! {
962    static CURRENT_STEP_ID: std::cell::Cell<Option<uuid::Uuid>> =
963        const { std::cell::Cell::new(None) };
964}
965
966// Callers land in cluster 10 (scheduler); these setters are part of
967// the public-within-crate API the scheduler will wire up.
968#[allow(dead_code)]
969pub(crate) fn set_current_step_id(id: uuid::Uuid) {
970    CURRENT_STEP_ID.with(|c| c.set(Some(id)));
971}
972
973#[allow(dead_code)]
974pub(crate) fn clear_current_step_id() {
975    CURRENT_STEP_ID.with(|c| c.set(None));
976}
977
978pub(crate) fn current_step_id() -> Option<uuid::Uuid> {
979    CURRENT_STEP_ID.with(std::cell::Cell::get)
980}
981
982#[cfg(test)]
983#[allow(
984    clippy::unwrap_used,
985    clippy::expect_used,
986    clippy::panic,
987    unsafe_code,
988    reason = "tests poke env vars via std::env::set_var, which is unsafe in Rust 2024"
989)]
990mod plugin_kv_tests {
991    use super::*;
992
993    #[test]
994    fn plugin_kv_round_trip_through_disk() {
995        // Use a temp HOME so we don't stomp on the developer's
996        // real ~/.config/harmont/state.
997        let temp = tempfile::tempdir().unwrap();
998        // SAFETY: in-process env var set; reset after.
999        unsafe {
1000            std::env::set_var("XDG_CONFIG_HOME", temp.path());
1001        }
1002        set_current_plugin_name("test-plugin".into());
1003
1004        kv_set_impl(KvScope::Plugin, "key", b"value".to_vec());
1005        assert_eq!(kv_get_impl(KvScope::Plugin, "key"), Some(b"value".to_vec()));
1006
1007        // Simulate a fresh process: the in-memory state is gone; only
1008        // the on-disk file is authoritative. Re-read.
1009        let again = kv_get_impl(KvScope::Plugin, "key");
1010        assert_eq!(again, Some(b"value".to_vec()));
1011
1012        clear_current_plugin_name();
1013    }
1014
1015    #[test]
1016    fn plugin_kv_isolated_per_plugin_name() {
1017        let temp = tempfile::tempdir().unwrap();
1018        // SAFETY: in-process env var set; reset after.
1019        unsafe {
1020            std::env::set_var("XDG_CONFIG_HOME", temp.path());
1021        }
1022
1023        set_current_plugin_name("alpha".into());
1024        kv_set_impl(KvScope::Plugin, "k", b"a".to_vec());
1025
1026        set_current_plugin_name("beta".into());
1027        assert_eq!(kv_get_impl(KvScope::Plugin, "k"), None);
1028
1029        set_current_plugin_name("alpha".into());
1030        assert_eq!(kv_get_impl(KvScope::Plugin, "k"), Some(b"a".to_vec()));
1031
1032        clear_current_plugin_name();
1033    }
1034}
1035
1036#[cfg(test)]
1037#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1038mod loopback_tests {
1039    use super::*;
1040
1041    #[tokio::test(flavor = "multi_thread")]
1042    async fn spawn_then_recv_callback() {
1043        let handle = spawn_loopback_impl_async(None).await.unwrap();
1044        let port = handle.0;
1045
1046        // Issue the callback against the bound port. Detached: the
1047        // listener captures the URI and shuts down after responding;
1048        // whether the client sees a clean close or a reset doesn't
1049        // matter for our assertion.
1050        let url = format!("http://127.0.0.1:{port}/cb?code=xyz&state=abc");
1051        let _client = tokio::spawn(async move {
1052            let _ = reqwest::get(&url).await;
1053        });
1054
1055        let data = loopback_recv_impl_async(LoopbackRecvArgs {
1056            h: handle,
1057            timeout_ms: 5000,
1058        })
1059        .await
1060        .expect("got callback");
1061        assert_eq!(data.path, "/cb");
1062        assert_eq!(data.query.get("code"), Some(&"xyz".to_string()));
1063        assert_eq!(data.query.get("state"), Some(&"abc".to_string()));
1064    }
1065}