Skip to main content

vs_daemon/daemon/
pending.rs

1//! Pending-input queue. Backs the `vs_prompt_input` MCP path: the
2//! `vs mcp` subprocess has no tty, so an MCP-driven agent that calls
3//! `vs_prompt_input` enqueues a pending entry and blocks (with a
4//! timeout) on a condvar. The user — at their interactive shell —
5//! runs `vs pending fulfill <id>` (or `vs pending list` to see what's
6//! queued), types the value into the local tty, and that fulfills
7//! the entry. The condvar wakes the parked MCP request, the daemon
8//! actually fills the field, and the agent's tool call returns
9//! success.
10//!
11//! Local `vs prompt-input` never touches this queue — it reads from
12//! the tty in-process. The queue exists only for the "no tty" case.
13
14use std::collections::HashMap;
15use std::sync::{Arc, Condvar, Mutex};
16use std::time::{Duration, Instant};
17
18/// One entry in the pending-input queue. Visible-to-the-user fields
19/// (`page`, `r`, `message`, `secret`) are surfaced by `vs pending
20/// list`; the daemon-internal fields (`token`, `group`) are passed
21/// through to the `vs_act fill` call on fulfillment.
22#[derive(Debug, Clone)]
23pub struct PendingEntry {
24    pub id: String,
25    pub page: String,
26    pub r: u32,
27    pub message: String,
28    pub secret: bool,
29    pub token: String,
30    pub group: Option<String>,
31    pub created_at: Instant,
32}
33
34/// Outcome of a pending entry once it leaves the queue.
35#[derive(Debug, Clone)]
36pub enum FulfillState {
37    Pending,
38    Fulfilled(String),
39    Cancelled,
40}
41
42/// The queue itself. `Inner.queue` holds the registry; `Inner.cv` is
43/// the wake signal for parked `vs_prompt_input` calls. Wrapped in
44/// `Arc<Mutex>` so multiple daemon threads can share it.
45#[derive(Default)]
46pub struct PendingQueue {
47    inner: Mutex<HashMap<String, (PendingEntry, FulfillState)>>,
48    cv: Condvar,
49}
50
51impl PendingQueue {
52    #[must_use]
53    pub fn new() -> Arc<Self> {
54        Arc::new(Self::default())
55    }
56
57    /// Enqueue a pending entry and block on the condvar until it is
58    /// fulfilled, cancelled, or `timeout` elapses. Returns the value
59    /// on fulfillment, `None` on cancellation or timeout.
60    #[must_use]
61    pub fn enqueue_and_wait(&self, entry: PendingEntry, timeout: Duration) -> Option<String> {
62        let id = entry.id.clone();
63        {
64            let mut guard = self.inner.lock().unwrap();
65            guard.insert(id.clone(), (entry, FulfillState::Pending));
66        }
67        let deadline = Instant::now() + timeout;
68        let mut guard = self.inner.lock().unwrap();
69        loop {
70            let remaining = match deadline.checked_duration_since(Instant::now()) {
71                Some(r) if !r.is_zero() => r,
72                _ => {
73                    guard.remove(&id);
74                    return None;
75                }
76            };
77            let (g, _) = self.cv.wait_timeout(guard, remaining).unwrap();
78            guard = g;
79            let (_, state) = guard.get(&id)?;
80            match state.clone() {
81                FulfillState::Pending => {}
82                FulfillState::Fulfilled(v) => {
83                    guard.remove(&id);
84                    return Some(v);
85                }
86                FulfillState::Cancelled => {
87                    guard.remove(&id);
88                    return None;
89                }
90            }
91        }
92    }
93
94    /// Snapshot of all pending entries (id + user-visible metadata).
95    #[must_use]
96    pub fn list(&self) -> Vec<PendingEntry> {
97        let guard = self.inner.lock().unwrap();
98        guard
99            .values()
100            .filter(|(_, s)| matches!(s, FulfillState::Pending))
101            .map(|(e, _)| e.clone())
102            .collect()
103    }
104
105    /// Fulfill a pending entry with `value`. Wakes parked waiters.
106    pub fn fulfill(&self, id: &str, value: String) -> bool {
107        let mut guard = self.inner.lock().unwrap();
108        if let Some((_, state)) = guard.get_mut(id) {
109            if matches!(state, FulfillState::Pending) {
110                *state = FulfillState::Fulfilled(value);
111                self.cv.notify_all();
112                return true;
113            }
114        }
115        false
116    }
117
118    /// Cancel a pending entry.
119    pub fn cancel(&self, id: &str) -> bool {
120        let mut guard = self.inner.lock().unwrap();
121        if let Some((_, state)) = guard.get_mut(id) {
122            if matches!(state, FulfillState::Pending) {
123                *state = FulfillState::Cancelled;
124                self.cv.notify_all();
125                return true;
126            }
127        }
128        false
129    }
130
131    /// Peek a pending entry (no removal).
132    #[must_use]
133    pub fn peek(&self, id: &str) -> Option<PendingEntry> {
134        let guard = self.inner.lock().unwrap();
135        guard
136            .get(id)
137            .filter(|(_, s)| matches!(s, FulfillState::Pending))
138            .map(|(e, _)| e.clone())
139    }
140}
141
142use std::sync::atomic::{AtomicU64, Ordering};
143use std::time::SystemTime;
144
145static COUNTER: AtomicU64 = AtomicU64::new(0);
146
147/// Generate a short, URL-safe id for a new entry.
148#[must_use]
149pub fn new_id() -> String {
150    let nanos = SystemTime::now()
151        .duration_since(SystemTime::UNIX_EPOCH)
152        .map_or(0_u128, |d| d.as_nanos());
153    let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
154    #[allow(clippy::cast_possible_truncation)]
155    let n = nanos as u64;
156    let combined = n.wrapping_mul(0x9E37_79B9_7F4A_7C15) ^ counter;
157    format!("p_{combined:016x}")
158}