1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, OnceLock, RwLock};
/// Process-global approval registry shared between the gateway (which registers
/// pending approvals) and the channel listeners (Discord/Slack/Telegram) which
/// handle keyword replies.
///
/// Both components run in the same process when started via `construct daemon`.
/// This singleton is created on first access and lives for the process lifetime.
static GLOBAL_REGISTRY: OnceLock<Arc<ApprovalRegistry>> = OnceLock::new();
/// Return the process-global `ApprovalRegistry`, creating it on first call.
pub fn global() -> Arc<ApprovalRegistry> {
Arc::clone(GLOBAL_REGISTRY.get_or_init(|| Arc::new(ApprovalRegistry::new())))
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PendingApproval {
pub run_id: String,
pub step_id: String,
pub workflow_name: String,
pub approve_keywords: Vec<String>,
pub reject_keywords: Vec<String>,
pub cwd: String,
pub created_at: DateTime<Utc>,
// Channel scoping — populated AFTER the approval prompt is sent so we can
// restrict keyword matching to the thread / reply that belongs to this
// specific approval. Without these the bot matches any message in the
// configured notification channel, which conflates parallel approvals
// and triggers on unrelated chatter.
pub discord_channel_id: Option<String>,
pub discord_thread_id: Option<String>,
pub discord_prompt_message_id: Option<String>,
pub slack_channel_id: Option<String>,
pub slack_thread_ts: Option<String>,
pub telegram_chat_id: Option<String>,
pub telegram_prompt_message_id: Option<i64>,
}
impl PendingApproval {
pub fn new(
run_id: String,
step_id: String,
workflow_name: String,
approve_keywords: Vec<String>,
reject_keywords: Vec<String>,
cwd: String,
) -> Self {
Self {
run_id,
step_id,
workflow_name,
approve_keywords,
reject_keywords,
cwd,
created_at: Utc::now(),
discord_channel_id: None,
discord_thread_id: None,
discord_prompt_message_id: None,
slack_channel_id: None,
slack_thread_ts: None,
telegram_chat_id: None,
telegram_prompt_message_id: None,
}
}
}
/// Thread-safe registry of pending workflow approvals.
///
/// When a workflow hits a human_approval step, the operator pushes an event
/// to the gateway. The gateway registers the pending approval here.
/// When a user responds (via Discord/Slack/Telegram reply or dashboard REST),
/// the gateway looks up the approval, atomically claims it, and calls
/// resume_workflow.
pub struct ApprovalRegistry {
pending: RwLock<HashMap<String, PendingApproval>>, // keyed by run_id
}
impl ApprovalRegistry {
pub fn new() -> Self {
Self {
pending: RwLock::new(HashMap::new()),
}
}
/// Register a new pending approval. Channel thread/reply IDs are attached
/// later via the `attach_*` methods once the channel adapter has sent the
/// prompt and received the message/thread identifiers back.
pub fn register(&self, approval: PendingApproval) {
let mut map = self.pending.write().unwrap();
map.insert(approval.run_id.clone(), approval);
}
/// Attach Discord thread + prompt message IDs to an existing pending approval.
pub fn attach_discord(
&self,
run_id: &str,
channel_id: Option<String>,
thread_id: Option<String>,
prompt_message_id: Option<String>,
) {
let mut map = self.pending.write().unwrap();
if let Some(a) = map.get_mut(run_id) {
if channel_id.is_some() {
a.discord_channel_id = channel_id;
}
a.discord_thread_id = thread_id;
a.discord_prompt_message_id = prompt_message_id;
}
}
/// Attach Slack channel + thread_ts to an existing pending approval.
pub fn attach_slack(
&self,
run_id: &str,
channel_id: Option<String>,
thread_ts: Option<String>,
) {
let mut map = self.pending.write().unwrap();
if let Some(a) = map.get_mut(run_id) {
a.slack_channel_id = channel_id;
a.slack_thread_ts = thread_ts;
}
}
/// Attach Telegram chat + prompt message_id to an existing pending approval.
pub fn attach_telegram(
&self,
run_id: &str,
chat_id: Option<String>,
prompt_message_id: Option<i64>,
) {
let mut map = self.pending.write().unwrap();
if let Some(a) = map.get_mut(run_id) {
a.telegram_chat_id = chat_id;
a.telegram_prompt_message_id = prompt_message_id;
}
}
/// Atomically claim a pending approval. Returns Some if the approval
/// existed and hadn't been claimed yet. Returns None if already claimed
/// or not found. This prevents race conditions between channel adapters
/// and the dashboard.
pub fn try_claim(&self, run_id: &str) -> Option<PendingApproval> {
let mut map = self.pending.write().unwrap();
map.remove(run_id)
}
/// Match a Discord message to a pending approval. Only matches when the
/// message is either in the thread that was created for the approval OR
/// is a reply to the original prompt message. This prevents unrelated
/// chatter in the notification channel from triggering approvals and
/// cleanly disambiguates parallel approvals.
///
/// Pass `None` for `thread_id` if the incoming message is in the root
/// channel (not a thread). Pass `None` for `reply_to_message_id` if the
/// incoming message is not a reply.
pub fn match_discord_keyword(
&self,
channel_id: &str,
thread_id: Option<&str>,
reply_to_message_id: Option<&str>,
message: &str,
) -> Option<(String, bool, String)> {
let map = self.pending.read().unwrap();
let msg_lower = message.trim().to_lowercase();
for (run_id, approval) in map.iter() {
let in_expected_thread = match (&approval.discord_thread_id, thread_id) {
(Some(want), Some(got)) => want == got,
_ => false,
};
let is_reply_to_prompt =
match (&approval.discord_prompt_message_id, reply_to_message_id) {
(Some(want), Some(got)) => want == got,
_ => false,
};
// Back-compat: if we never captured thread/message IDs (e.g. send
// failed or thread creation was denied), fall back to matching by
// channel only. This preserves existing behavior for unscoped
// deployments but is strictly worse disambiguation.
let fallback_channel_only = approval.discord_thread_id.is_none()
&& approval.discord_prompt_message_id.is_none()
&& approval
.discord_channel_id
.as_ref()
.map(|id| id == channel_id)
.unwrap_or(false);
if !(in_expected_thread || is_reply_to_prompt || fallback_channel_only) {
continue;
}
if let Some(res) = match_keywords(
&msg_lower,
message,
&approval.approve_keywords,
&approval.reject_keywords,
) {
return Some((run_id.clone(), res.0, res.1));
}
}
None
}
/// Match a Slack message to a pending approval. Requires the incoming
/// message to carry `thread_ts` equal to the approval's captured ts.
pub fn match_slack_keyword(
&self,
channel_id: &str,
thread_ts: Option<&str>,
message: &str,
) -> Option<(String, bool, String)> {
let map = self.pending.read().unwrap();
let msg_lower = message.trim().to_lowercase();
for (run_id, approval) in map.iter() {
let channel_match = approval
.slack_channel_id
.as_ref()
.map(|id| id == channel_id)
.unwrap_or(false);
if !channel_match {
continue;
}
let thread_match = match (&approval.slack_thread_ts, thread_ts) {
(Some(want), Some(got)) => want == got,
_ => false,
};
if !thread_match {
continue;
}
if let Some(res) = match_keywords(
&msg_lower,
message,
&approval.approve_keywords,
&approval.reject_keywords,
) {
return Some((run_id.clone(), res.0, res.1));
}
}
None
}
/// Match a Telegram message to a pending approval. Requires the incoming
/// message to be a reply to the approval's prompt message.
pub fn match_telegram_keyword(
&self,
chat_id: &str,
reply_to_message_id: Option<i64>,
message: &str,
) -> Option<(String, bool, String)> {
let map = self.pending.read().unwrap();
let msg_lower = message.trim().to_lowercase();
for (run_id, approval) in map.iter() {
let chat_match = approval
.telegram_chat_id
.as_ref()
.map(|id| id == chat_id)
.unwrap_or(false);
if !chat_match {
continue;
}
let reply_match = match (approval.telegram_prompt_message_id, reply_to_message_id) {
(Some(want), Some(got)) => want == got,
_ => false,
};
if !reply_match {
continue;
}
if let Some(res) = match_keywords(
&msg_lower,
message,
&approval.approve_keywords,
&approval.reject_keywords,
) {
return Some((run_id.clone(), res.0, res.1));
}
}
None
}
/// Remove a pending approval (cleanup after resolution).
pub fn remove(&self, run_id: &str) {
let mut map = self.pending.write().unwrap();
map.remove(run_id);
}
/// List all pending approvals (for debugging/status).
pub fn list_pending(&self) -> Vec<PendingApproval> {
let map = self.pending.read().unwrap();
map.values().cloned().collect()
}
}
/// Check a normalized message against approve/reject keyword lists. Returns
/// `(is_approve, feedback)` on match.
fn match_keywords(
msg_lower: &str,
original: &str,
approve_keywords: &[String],
reject_keywords: &[String],
) -> Option<(bool, String)> {
for kw in approve_keywords {
if msg_lower == kw || msg_lower.starts_with(&format!("{} ", kw)) {
return Some((true, String::new()));
}
}
for kw in reject_keywords {
if msg_lower == kw {
return Some((false, String::new()));
}
if msg_lower.starts_with(&format!("{} ", kw)) {
let feedback = original.trim()[kw.len()..].trim().to_string();
return Some((false, feedback));
}
}
None
}