1#![allow(clippy::needless_pass_by_value)]
11#![allow(clippy::unnecessary_wraps)]
12#![allow(clippy::wildcard_imports)]
13#![allow(clippy::missing_errors_doc)]
14#![allow(clippy::cast_possible_truncation)]
17#![allow(clippy::cast_sign_loss)]
18#![allow(clippy::too_many_lines)]
21#![allow(clippy::items_after_statements)]
24#![allow(clippy::missing_const_for_fn)]
28#![allow(clippy::map_unwrap_or)]
32#![allow(clippy::incompatible_msrv)]
35#![allow(clippy::non_std_lazy_statics)]
36#![allow(clippy::significant_drop_in_scrutinee)]
40#![allow(clippy::significant_drop_tightening)]
41
42use std::collections::{BTreeMap, HashMap};
43use std::sync::{Arc, Mutex};
44
45use extism::convert::Json;
46use extism::{Function, PTR, UserData, ValType, host_fn};
47use hm_plugin_protocol::host_abi::{
48 ArchiveReadArgs, CallbackData, KeyringArgs, KeyringSetArgs, KvScope, Level, LoopbackHandle,
49 LoopbackRecvArgs, SocketHandle, SocketReadArgs, SocketWriteArgs, TtyConfirmArgs, TtyPromptArgs,
50};
51use hm_plugin_protocol::{
52 ArchiveId, BuildEvent, DockerCommitArgs, DockerExecArgs, DockerExtractArgs, DockerStartArgs,
53 StdStream,
54};
55use once_cell::sync::Lazy;
56
57pub const HOST_FN_NAMES: &[&str] = &[
60 "hm_log",
61 "hm_emit_step_log",
62 "hm_emit_event",
63 "hm_kv_get",
64 "hm_kv_set",
65 "hm_archive_read",
66 "hm_archive_total_size",
67 "hm_fs_read_config",
68 "hm_unix_socket_connect",
69 "hm_socket_write",
70 "hm_socket_read",
71 "hm_socket_close",
72 "hm_keyring_get",
73 "hm_keyring_set",
74 "hm_keyring_delete",
75 "hm_tty_prompt",
76 "hm_tty_confirm",
77 "hm_browser_open",
78 "hm_spawn_loopback",
79 "hm_loopback_recv",
80 "hm_should_cancel",
81 "hm_docker_ping",
82 "hm_docker_image_exists",
83 "hm_docker_pull",
84 "hm_docker_start_container",
85 "hm_docker_extract_workspace",
86 "hm_docker_exec",
87 "hm_docker_commit",
88 "hm_docker_remove_image",
89 "hm_docker_stop_remove",
90 "hm_write_stdout",
91 "hm_write_stderr",
92];
93
94host_fn!(pub _hm_log(_user_data: (); level: Json<Level>, msg: String) {
100 let Json(level) = level;
101 log_impl(level, &msg);
102 Ok(())
103});
104
105host_fn!(pub _hm_emit_step_log(_user_data: (); stream: Json<StdStream>, bytes: Vec<u8>) {
106 let Json(stream) = stream;
107 emit_step_log_impl(stream, &bytes);
108 Ok(())
109});
110
111host_fn!(pub _hm_emit_event(_user_data: (); event: Json<BuildEvent>) {
112 let Json(event) = event;
113 emit_event_impl(event);
114 Ok(())
115});
116
117host_fn!(pub _hm_kv_get(_user_data: (); scope: Json<KvScope>, key: String) -> Json<Option<Vec<u8>>> {
118 let Json(scope) = scope;
119 Ok(Json(kv_get_impl(scope, &key)))
120});
121
122host_fn!(pub _hm_kv_set(_user_data: (); scope: Json<KvScope>, key: String, val: Vec<u8>) {
123 let Json(scope) = scope;
124 kv_set_impl(scope, &key, val);
125 Ok(())
126});
127
128host_fn!(pub _hm_archive_read(_user_data: (); args: Json<ArchiveReadArgs>) -> Vec<u8> {
129 let Json(args) = args;
130 Ok(archive_read_impl(args))
131});
132
133host_fn!(pub _hm_archive_total_size(_user_data: (); id: Json<ArchiveId>) -> u64 {
134 let Json(id) = id;
135 Ok(archive_total_size_impl(id))
136});
137
138host_fn!(pub _hm_fs_read_config(_user_data: (); rel_path: String) -> Json<Option<Vec<u8>>> {
139 Ok(Json(fs_read_config_impl(&rel_path)))
140});
141
142host_fn!(pub _hm_unix_socket_connect(_user_data: (); path: String) -> Json<SocketHandle> {
143 Ok(Json(unix_socket_connect_impl(&path)))
144});
145
146host_fn!(pub _hm_socket_write(_user_data: (); args: Json<SocketWriteArgs>) -> u64 {
147 let Json(args) = args;
148 Ok(socket_write_impl(args))
149});
150
151host_fn!(pub _hm_socket_read(_user_data: (); args: Json<SocketReadArgs>) -> Vec<u8> {
152 let Json(args) = args;
153 Ok(socket_read_impl(args))
154});
155
156host_fn!(pub _hm_socket_close(_user_data: (); h: Json<SocketHandle>) {
157 let Json(h) = h;
158 socket_close_impl(h);
159 Ok(())
160});
161
162host_fn!(pub _hm_keyring_get(_user_data: (); args: Json<KeyringArgs>) -> Json<Option<String>> {
163 let Json(args) = args;
164 Ok(Json(keyring_get_impl(&args.service, &args.account)))
165});
166
167host_fn!(pub _hm_keyring_set(_user_data: (); args: Json<KeyringSetArgs>) {
168 let Json(args) = args;
169 keyring_set_impl(&args.service, &args.account, &args.secret);
170 Ok(())
171});
172
173host_fn!(pub _hm_keyring_delete(_user_data: (); args: Json<KeyringArgs>) {
174 let Json(args) = args;
175 keyring_delete_impl(&args.service, &args.account);
176 Ok(())
177});
178
179host_fn!(pub _hm_tty_prompt(_user_data: (); args: Json<TtyPromptArgs>) -> String {
180 let Json(args) = args;
181 Ok(tty_prompt_impl(&args.msg, args.mask))
182});
183
184host_fn!(pub _hm_tty_confirm(_user_data: (); args: Json<TtyConfirmArgs>) -> u32 {
185 let Json(args) = args;
186 Ok(u32::from(tty_confirm_impl(&args.msg, args.default)))
187});
188
189host_fn!(pub _hm_browser_open(_user_data: (); url: String) -> u32 {
190 Ok(u32::from(browser_open_impl(&url)))
191});
192
193host_fn!(pub _hm_spawn_loopback(_user_data: (); port: Json<Option<u16>>) -> Json<LoopbackHandle> {
194 let Json(port) = port;
195 let handle = tokio::task::block_in_place(|| {
196 tokio::runtime::Handle::current().block_on(spawn_loopback_impl_async(port))
197 })?;
198 Ok(Json(handle))
199});
200
201host_fn!(pub _hm_loopback_recv(_user_data: (); args: Json<LoopbackRecvArgs>) -> Json<Option<CallbackData>> {
202 let Json(args) = args;
203 let data = tokio::task::block_in_place(|| {
204 tokio::runtime::Handle::current().block_on(loopback_recv_impl_async(args))
205 });
206 Ok(Json(data))
207});
208
209host_fn!(pub _hm_should_cancel(_user_data: ();) -> u32 {
210 Ok(u32::from(should_cancel_impl()))
211});
212
213host_fn!(pub _hm_docker_ping(_user_data: ();) -> u32 {
214 let ok = tokio::task::block_in_place(|| {
215 tokio::runtime::Handle::current()
216 .block_on(crate::orchestrator::docker_host_fns::ping_impl())
217 });
218 Ok(u32::from(ok))
219});
220
221host_fn!(pub _hm_docker_image_exists(_user_data: (); tag: String) -> u32 {
222 let exists = tokio::task::block_in_place(|| {
223 tokio::runtime::Handle::current()
224 .block_on(crate::orchestrator::docker_host_fns::image_exists_impl(tag))
225 });
226 Ok(u32::from(exists))
227});
228
229host_fn!(pub _hm_docker_pull(_user_data: (); tag: String) {
230 tokio::task::block_in_place(|| {
231 tokio::runtime::Handle::current()
232 .block_on(crate::orchestrator::docker_host_fns::pull_impl(tag))
233 })?;
234 Ok(())
235});
236
237host_fn!(pub _hm_docker_start_container(_user_data: (); args: Json<DockerStartArgs>) -> String {
238 let Json(args) = args;
239 let id = tokio::task::block_in_place(|| {
240 tokio::runtime::Handle::current()
241 .block_on(crate::orchestrator::docker_host_fns::start_container_impl(args))
242 })?;
243 Ok(id)
244});
245
246host_fn!(pub _hm_docker_extract_workspace(_user_data: (); args: Json<DockerExtractArgs>) {
247 let Json(args) = args;
248 tokio::task::block_in_place(|| {
249 tokio::runtime::Handle::current()
250 .block_on(crate::orchestrator::docker_host_fns::extract_workspace_impl(args))
251 })?;
252 Ok(())
253});
254
255host_fn!(pub _hm_docker_exec(_user_data: (); args: Json<DockerExecArgs>) -> i32 {
256 let Json(args) = args;
257 let rc = tokio::task::block_in_place(|| {
258 tokio::runtime::Handle::current()
259 .block_on(crate::orchestrator::docker_host_fns::exec_impl(args))
260 })?;
261 Ok(rc)
262});
263
264host_fn!(pub _hm_docker_commit(_user_data: (); args: Json<DockerCommitArgs>) -> String {
265 let Json(args) = args;
266 let tag = tokio::task::block_in_place(|| {
267 tokio::runtime::Handle::current()
268 .block_on(crate::orchestrator::docker_host_fns::commit_impl(args))
269 })?;
270 Ok(tag)
271});
272
273host_fn!(pub _hm_docker_remove_image(_user_data: (); tag: String) {
274 let _ = tokio::task::block_in_place(|| {
275 tokio::runtime::Handle::current()
276 .block_on(crate::orchestrator::docker_host_fns::remove_image_impl(tag))
277 });
278 Ok(())
279});
280
281host_fn!(pub _hm_docker_stop_remove(_user_data: (); container_id: String) {
282 tokio::task::block_in_place(|| {
283 tokio::runtime::Handle::current()
284 .block_on(crate::orchestrator::docker_host_fns::stop_remove_impl(container_id));
285 });
286 Ok(())
287});
288
289host_fn!(pub _hm_write_stdout(_user_data: (); bytes: Vec<u8>) {
290 write_stdout_impl(&bytes);
291 Ok(())
292});
293
294host_fn!(pub _hm_write_stderr(_user_data: (); bytes: Vec<u8>) {
295 write_stderr_impl(&bytes);
296 Ok(())
297});
298
299pub fn all() -> Vec<Function> {
306 let ud: UserData<()> = UserData::default();
307 fn pty(n: usize) -> Vec<ValType> {
308 (0..n).map(|_| PTR).collect()
309 }
310 vec![
311 Function::new("hm_log", pty(2), pty(0), ud.clone(), _hm_log),
312 Function::new(
313 "hm_emit_step_log",
314 pty(2),
315 pty(0),
316 ud.clone(),
317 _hm_emit_step_log,
318 ),
319 Function::new("hm_emit_event", pty(1), pty(0), ud.clone(), _hm_emit_event),
320 Function::new("hm_kv_get", pty(2), pty(1), ud.clone(), _hm_kv_get),
321 Function::new("hm_kv_set", pty(3), pty(0), ud.clone(), _hm_kv_set),
322 Function::new(
323 "hm_archive_read",
324 pty(1),
325 pty(1),
326 ud.clone(),
327 _hm_archive_read,
328 ),
329 Function::new(
330 "hm_archive_total_size",
331 pty(1),
332 pty(1),
333 ud.clone(),
334 _hm_archive_total_size,
335 ),
336 Function::new(
337 "hm_fs_read_config",
338 pty(1),
339 pty(1),
340 ud.clone(),
341 _hm_fs_read_config,
342 ),
343 Function::new(
344 "hm_unix_socket_connect",
345 pty(1),
346 pty(1),
347 ud.clone(),
348 _hm_unix_socket_connect,
349 ),
350 Function::new(
351 "hm_socket_write",
352 pty(1),
353 pty(1),
354 ud.clone(),
355 _hm_socket_write,
356 ),
357 Function::new(
358 "hm_socket_read",
359 pty(1),
360 pty(1),
361 ud.clone(),
362 _hm_socket_read,
363 ),
364 Function::new(
365 "hm_socket_close",
366 pty(1),
367 pty(0),
368 ud.clone(),
369 _hm_socket_close,
370 ),
371 Function::new(
372 "hm_keyring_get",
373 pty(1),
374 pty(1),
375 ud.clone(),
376 _hm_keyring_get,
377 ),
378 Function::new(
379 "hm_keyring_set",
380 pty(1),
381 pty(0),
382 ud.clone(),
383 _hm_keyring_set,
384 ),
385 Function::new(
386 "hm_keyring_delete",
387 pty(1),
388 pty(0),
389 ud.clone(),
390 _hm_keyring_delete,
391 ),
392 Function::new("hm_tty_prompt", pty(1), pty(1), ud.clone(), _hm_tty_prompt),
393 Function::new(
394 "hm_tty_confirm",
395 pty(1),
396 pty(1),
397 ud.clone(),
398 _hm_tty_confirm,
399 ),
400 Function::new(
401 "hm_browser_open",
402 pty(1),
403 pty(1),
404 ud.clone(),
405 _hm_browser_open,
406 ),
407 Function::new(
408 "hm_spawn_loopback",
409 pty(1),
410 pty(1),
411 ud.clone(),
412 _hm_spawn_loopback,
413 ),
414 Function::new(
415 "hm_loopback_recv",
416 pty(1),
417 pty(1),
418 ud.clone(),
419 _hm_loopback_recv,
420 ),
421 Function::new(
422 "hm_should_cancel",
423 pty(0),
424 pty(1),
425 ud.clone(),
426 _hm_should_cancel,
427 ),
428 Function::new(
429 "hm_docker_ping",
430 pty(0),
431 pty(1),
432 ud.clone(),
433 _hm_docker_ping,
434 ),
435 Function::new(
436 "hm_docker_image_exists",
437 pty(1),
438 pty(1),
439 ud.clone(),
440 _hm_docker_image_exists,
441 ),
442 Function::new(
443 "hm_docker_pull",
444 pty(1),
445 pty(0),
446 ud.clone(),
447 _hm_docker_pull,
448 ),
449 Function::new(
450 "hm_docker_start_container",
451 pty(1),
452 pty(1),
453 ud.clone(),
454 _hm_docker_start_container,
455 ),
456 Function::new(
457 "hm_docker_extract_workspace",
458 pty(1),
459 pty(0),
460 ud.clone(),
461 _hm_docker_extract_workspace,
462 ),
463 Function::new(
464 "hm_docker_exec",
465 pty(1),
466 pty(1),
467 ud.clone(),
468 _hm_docker_exec,
469 ),
470 Function::new(
471 "hm_docker_commit",
472 pty(1),
473 pty(1),
474 ud.clone(),
475 _hm_docker_commit,
476 ),
477 Function::new(
478 "hm_docker_remove_image",
479 pty(1),
480 pty(0),
481 ud.clone(),
482 _hm_docker_remove_image,
483 ),
484 Function::new(
485 "hm_docker_stop_remove",
486 pty(1),
487 pty(0),
488 ud,
489 _hm_docker_stop_remove,
490 ),
491 Function::new(
492 "hm_write_stdout",
493 [ValType::I64],
494 [],
495 UserData::default(),
496 _hm_write_stdout,
497 ),
498 Function::new(
499 "hm_write_stderr",
500 [ValType::I64],
501 [],
502 UserData::default(),
503 _hm_write_stderr,
504 ),
505 ]
506}
507
508static GLOBAL: Lazy<Mutex<HostState>> = Lazy::new(|| Mutex::new(HostState::default()));
515
516#[derive(Debug, Default)]
517struct HostState {
518 build_kv: BTreeMap<String, Vec<u8>>,
519 step_kv: BTreeMap<String, Vec<u8>>,
520 sockets: HashMap<SocketHandle, Vec<u8>>,
523 next_socket: u64,
524 loopback_slots: HashMap<LoopbackHandle, Arc<LoopbackSlot>>,
530}
531
532#[derive(Debug)]
540struct LoopbackSlot {
541 receiver: tokio::sync::Mutex<Option<tokio::sync::oneshot::Receiver<CallbackData>>>,
542 #[allow(
543 dead_code,
544 reason = "held to keep the token alive; cancellation is driven by the route closure's clone"
545 )]
546 shutdown_token: tokio_util::sync::CancellationToken,
547}
548
549fn log_impl(level: Level, msg: &str) {
550 match level {
551 Level::Trace => tracing::trace!(target: "plugin", "{msg}"),
552 Level::Debug => tracing::debug!(target: "plugin", "{msg}"),
553 Level::Info => tracing::info!(target: "plugin", "{msg}"),
554 Level::Warn => tracing::warn!(target: "plugin", "{msg}"),
555 Level::Error => tracing::error!(target: "plugin", "{msg}"),
556 }
557}
558
559fn emit_step_log_impl(stream: StdStream, bytes: &[u8]) {
560 let Some(state) = crate::orchestrator::state::current() else {
561 return;
562 };
563 let Some(step_id) = current_step_id() else {
564 return;
565 };
566 let line = String::from_utf8_lossy(bytes).into_owned();
567 state.event_bus.emit(BuildEvent::StepLog {
568 step_id,
569 stream,
570 line,
571 ts: chrono::Utc::now(),
572 });
573}
574
575fn emit_event_impl(event: BuildEvent) {
576 if let Some(state) = crate::orchestrator::state::current() {
577 state.event_bus.emit(event);
578 }
579}
580
581fn kv_get_impl(scope: KvScope, key: &str) -> Option<Vec<u8>> {
582 match scope {
583 KvScope::Plugin => load_plugin_kv().get(key).cloned(),
584 KvScope::Build | KvScope::Step => {
585 let s = GLOBAL.lock().ok()?;
586 let m = match scope {
587 KvScope::Build => &s.build_kv,
588 KvScope::Step => &s.step_kv,
589 KvScope::Plugin => unreachable!(),
590 };
591 m.get(key).cloned()
592 }
593 }
594}
595
596#[doc(hidden)] pub fn kv_set_impl(scope: KvScope, key: &str, val: Vec<u8>) {
598 match scope {
599 KvScope::Plugin => {
600 let lock = lock_plugin_kv();
610 if lock.is_none() {
611 tracing::warn!(
612 target: "plugin::kv",
613 "plugin-scope KV lock acquisition failed; \
614 falling back to unprotected write. Concurrent \
615 writers may lose updates."
616 );
617 }
618 let mut kv = load_plugin_kv();
619 kv.insert(key.to_string(), val);
620 save_plugin_kv(&kv);
621 }
623 KvScope::Build | KvScope::Step => {
624 let Ok(mut s) = GLOBAL.lock() else { return };
625 let m = match scope {
626 KvScope::Build => &mut s.build_kv,
627 KvScope::Step => &mut s.step_kv,
628 KvScope::Plugin => unreachable!(),
629 };
630 m.insert(key.to_string(), val);
631 }
632 }
633}
634
635fn plugin_state_path() -> Option<std::path::PathBuf> {
655 let dir = dirs::config_dir()?.join("harmont").join("state");
656 let plugin = current_plugin_name()?;
657 Some(dir.join(format!("{plugin}.kv")))
658}
659
660fn lock_plugin_kv() -> Option<std::fs::File> {
667 use fs2::FileExt;
668 let kv_path = plugin_state_path()?;
669 let lock_path = kv_path.with_extension("lock");
670 if let Some(parent) = lock_path.parent() {
671 let _ = std::fs::create_dir_all(parent);
672 }
673 let f = std::fs::OpenOptions::new()
674 .create(true)
675 .truncate(false)
676 .read(true)
677 .write(true)
678 .open(&lock_path)
679 .ok()?;
680 f.lock_exclusive().ok()?;
681 Some(f)
682}
683
684fn current_plugin_name() -> Option<String> {
685 CURRENT_PLUGIN_NAME.with(|c| c.borrow().clone())
686}
687
688thread_local! {
689 pub(crate) static CURRENT_PLUGIN_NAME: std::cell::RefCell<Option<String>> =
690 const { std::cell::RefCell::new(None) };
691}
692
693#[doc(hidden)] pub fn set_current_plugin_name(name: String) {
695 CURRENT_PLUGIN_NAME.with(|c| *c.borrow_mut() = Some(name));
696}
697
698pub(crate) fn clear_current_plugin_name() {
699 CURRENT_PLUGIN_NAME.with(|c| *c.borrow_mut() = None);
700}
701
702#[doc(hidden)] #[must_use]
704pub fn load_plugin_kv() -> BTreeMap<String, Vec<u8>> {
705 let Some(path) = plugin_state_path() else {
706 return BTreeMap::default();
707 };
708 let Ok(bytes) = std::fs::read(&path) else {
709 return BTreeMap::default();
710 };
711 serde_json::from_slice(&bytes).unwrap_or_default()
712}
713
714fn save_plugin_kv(kv: &BTreeMap<String, Vec<u8>>) {
715 let Some(path) = plugin_state_path() else {
716 return;
717 };
718 let Some(parent) = path.parent() else { return };
719 let _ = std::fs::create_dir_all(parent);
720 if let Ok(bytes) = serde_json::to_vec(kv) {
721 let tmp = path.with_extension("kv.tmp");
724 if std::fs::write(&tmp, &bytes).is_ok() {
725 let _ = std::fs::rename(&tmp, &path);
726 }
727 }
728}
729
730fn archive_read_impl(args: ArchiveReadArgs) -> Vec<u8> {
731 crate::orchestrator::state::current()
732 .map(|s| s.archives.read(args.id, args.offset, args.max))
733 .unwrap_or_default()
734}
735
736fn archive_total_size_impl(id: ArchiveId) -> u64 {
737 crate::orchestrator::state::current()
738 .map(|s| s.archives.total_size(id))
739 .unwrap_or(0)
740}
741
742fn fs_read_config_impl(rel_path: &str) -> Option<Vec<u8>> {
743 let root_unresolved = std::env::current_dir().ok()?.join(".harmont");
744 let root = root_unresolved.canonicalize().ok()?;
745 let candidate = root.join(rel_path);
746 let canonical = candidate.canonicalize().ok()?;
747 if !canonical.starts_with(&root) {
748 return None;
749 }
750 std::fs::read(canonical).ok()
751}
752
753fn unix_socket_connect_impl(_path: &str) -> SocketHandle {
754 let Ok(mut s) = GLOBAL.lock() else {
755 return SocketHandle(0);
756 };
757 s.next_socket += 1;
758 let h = SocketHandle(s.next_socket);
759 s.sockets.insert(h, Vec::new());
760 h
761}
762
763fn socket_write_impl(args: SocketWriteArgs) -> u64 {
764 let Ok(mut s) = GLOBAL.lock() else { return 0 };
765 if let Some(buf) = s.sockets.get_mut(&args.h) {
766 buf.extend_from_slice(&args.bytes);
767 args.bytes.len() as u64
768 } else {
769 0
770 }
771}
772
773fn socket_read_impl(_args: SocketReadArgs) -> Vec<u8> {
774 Vec::new()
777}
778
779fn socket_close_impl(h: SocketHandle) {
780 let Ok(mut s) = GLOBAL.lock() else { return };
781 s.sockets.remove(&h);
782}
783
784fn keyring_get_impl(service: &str, account: &str) -> Option<String> {
785 crate::creds_store::get(service, account)
786}
787
788fn keyring_set_impl(service: &str, account: &str, secret: &str) {
789 crate::creds_store::set(service, account, secret);
790}
791
792fn keyring_delete_impl(service: &str, account: &str) {
793 crate::creds_store::delete(service, account);
794}
795
796fn tty_prompt_impl(msg: &str, mask: bool) -> String {
797 use dialoguer::{Input, Password};
798 if mask {
799 Password::new()
800 .with_prompt(msg)
801 .interact()
802 .unwrap_or_default()
803 } else {
804 Input::<String>::new()
805 .with_prompt(msg)
806 .interact_text()
807 .unwrap_or_default()
808 }
809}
810
811fn tty_confirm_impl(msg: &str, default: bool) -> bool {
812 use dialoguer::Confirm;
813 Confirm::new()
814 .with_prompt(msg)
815 .default(default)
816 .interact()
817 .unwrap_or(default)
818}
819
820fn browser_open_impl(url: &str) -> bool {
821 webbrowser::open(url).is_ok()
822}
823
824async fn spawn_loopback_impl_async(port: Option<u16>) -> anyhow::Result<LoopbackHandle> {
832 use anyhow::Context;
833 use axum::Router;
834 use axum::routing::get;
835 use std::net::SocketAddr;
836
837 let addr = SocketAddr::from(([127, 0, 0, 1], port.unwrap_or(0)));
838 let listener = tokio::net::TcpListener::bind(addr)
839 .await
840 .with_context(|| format!("bind loopback on {addr}"))?;
841 let bound_port = listener.local_addr()?.port();
842
843 let (tx, rx) = tokio::sync::oneshot::channel::<CallbackData>();
844 let tx_for_route: Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<CallbackData>>>> =
849 Arc::new(tokio::sync::Mutex::new(Some(tx)));
850 let shutdown = tokio_util::sync::CancellationToken::new();
851
852 let shutdown_for_route = shutdown.clone();
853 let app = Router::new().fallback(get(move |uri: axum::http::Uri| {
854 let tx = tx_for_route.clone();
855 let shutdown = shutdown_for_route.clone();
856 async move {
857 let path = uri.path().to_string();
858 let mut query: BTreeMap<String, String> = BTreeMap::new();
859 if let Some(q) = uri.query() {
860 for (k, v) in url::form_urlencoded::parse(q.as_bytes()) {
861 query.insert(k.into_owned(), v.into_owned());
862 }
863 }
864 let data = CallbackData { path, query };
865 if let Some(t) = tx.lock().await.take() {
866 let _ = t.send(data);
867 }
868 shutdown.cancel();
869 axum::response::Html(
870 "<!DOCTYPE html><html><body><h1>You can close this tab.</h1></body></html>",
871 )
872 }
873 }));
874
875 let shutdown_for_server = shutdown.clone();
876 tokio::spawn(async move {
877 let _ = axum::serve(listener, app)
878 .with_graceful_shutdown(shutdown_for_server.cancelled_owned())
879 .await;
880 });
881
882 let handle = LoopbackHandle(u64::from(bound_port));
883 let slot = Arc::new(LoopbackSlot {
884 receiver: tokio::sync::Mutex::new(Some(rx)),
885 shutdown_token: shutdown,
886 });
887 {
888 let mut g = GLOBAL
889 .lock()
890 .map_err(|_| anyhow::anyhow!("global host state lock poisoned"))?;
891 g.loopback_slots.insert(handle, slot);
892 }
893 Ok(handle)
894}
895
896async fn loopback_recv_impl_async(args: LoopbackRecvArgs) -> Option<CallbackData> {
900 let slot = {
901 let g = GLOBAL.lock().ok()?;
902 g.loopback_slots.get(&args.h).cloned()
903 }?;
904 let rx_opt = {
908 let mut rx_guard = slot.receiver.lock().await;
909 rx_guard.take()
910 };
911 let rx = rx_opt?;
912 match tokio::time::timeout(
913 std::time::Duration::from_millis(u64::from(args.timeout_ms)),
914 rx,
915 )
916 .await
917 {
918 Ok(Ok(data)) => Some(data),
919 _ => None,
920 }
921}
922
923fn should_cancel_impl() -> bool {
924 crate::orchestrator::state::current()
925 .map(|s| s.cancel.is_cancelled())
926 .unwrap_or(false)
927}
928
929#[allow(
930 clippy::print_stdout,
931 reason = "this fn's purpose is user-facing stdout output"
932)]
933fn write_stdout_impl(bytes: &[u8]) {
934 use std::io::Write;
935 let mut out = std::io::stdout().lock();
936 let _ = out.write_all(bytes);
939 let _ = out.flush();
940}
941
942#[allow(
943 clippy::print_stderr,
944 reason = "this fn's purpose is user-facing stderr output"
945)]
946fn write_stderr_impl(bytes: &[u8]) {
947 use std::io::Write;
948 let mut err = std::io::stderr().lock();
949 let _ = err.write_all(bytes);
950 let _ = err.flush();
951}
952
953thread_local! {
962 static CURRENT_STEP_ID: std::cell::Cell<Option<uuid::Uuid>> =
963 const { std::cell::Cell::new(None) };
964}
965
966#[allow(dead_code)]
969pub(crate) fn set_current_step_id(id: uuid::Uuid) {
970 CURRENT_STEP_ID.with(|c| c.set(Some(id)));
971}
972
973#[allow(dead_code)]
974pub(crate) fn clear_current_step_id() {
975 CURRENT_STEP_ID.with(|c| c.set(None));
976}
977
978pub(crate) fn current_step_id() -> Option<uuid::Uuid> {
979 CURRENT_STEP_ID.with(std::cell::Cell::get)
980}
981
982#[cfg(test)]
983#[allow(
984 clippy::unwrap_used,
985 clippy::expect_used,
986 clippy::panic,
987 unsafe_code,
988 reason = "tests poke env vars via std::env::set_var, which is unsafe in Rust 2024"
989)]
990mod plugin_kv_tests {
991 use super::*;
992
993 #[test]
994 fn plugin_kv_round_trip_through_disk() {
995 let temp = tempfile::tempdir().unwrap();
998 unsafe {
1000 std::env::set_var("XDG_CONFIG_HOME", temp.path());
1001 }
1002 set_current_plugin_name("test-plugin".into());
1003
1004 kv_set_impl(KvScope::Plugin, "key", b"value".to_vec());
1005 assert_eq!(kv_get_impl(KvScope::Plugin, "key"), Some(b"value".to_vec()));
1006
1007 let again = kv_get_impl(KvScope::Plugin, "key");
1010 assert_eq!(again, Some(b"value".to_vec()));
1011
1012 clear_current_plugin_name();
1013 }
1014
1015 #[test]
1016 fn plugin_kv_isolated_per_plugin_name() {
1017 let temp = tempfile::tempdir().unwrap();
1018 unsafe {
1020 std::env::set_var("XDG_CONFIG_HOME", temp.path());
1021 }
1022
1023 set_current_plugin_name("alpha".into());
1024 kv_set_impl(KvScope::Plugin, "k", b"a".to_vec());
1025
1026 set_current_plugin_name("beta".into());
1027 assert_eq!(kv_get_impl(KvScope::Plugin, "k"), None);
1028
1029 set_current_plugin_name("alpha".into());
1030 assert_eq!(kv_get_impl(KvScope::Plugin, "k"), Some(b"a".to_vec()));
1031
1032 clear_current_plugin_name();
1033 }
1034}
1035
1036#[cfg(test)]
1037#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
1038mod loopback_tests {
1039 use super::*;
1040
1041 #[tokio::test(flavor = "multi_thread")]
1042 async fn spawn_then_recv_callback() {
1043 let handle = spawn_loopback_impl_async(None).await.unwrap();
1044 let port = handle.0;
1045
1046 let url = format!("http://127.0.0.1:{port}/cb?code=xyz&state=abc");
1051 let _client = tokio::spawn(async move {
1052 let _ = reqwest::get(&url).await;
1053 });
1054
1055 let data = loopback_recv_impl_async(LoopbackRecvArgs {
1056 h: handle,
1057 timeout_ms: 5000,
1058 })
1059 .await
1060 .expect("got callback");
1061 assert_eq!(data.path, "/cb");
1062 assert_eq!(data.query.get("code"), Some(&"xyz".to_string()));
1063 assert_eq!(data.query.get("state"), Some(&"abc".to_string()));
1064 }
1065}