klieo-mcp-server 2.2.0

Expose any klieo ToolInvoker or Agent as an MCP server over stdio or HTTP. The inverse of klieo-tools-mcp.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
//! Workflow-as-tool adapter: drives `klieo_hitl::run_with_hitl` from `tools/call`.
//!
//! Parallel to `crate::AgentAsToolInvoker`, but the run body is the
//! HITL-aware loop (`run_with_hitl`) rather than the bare `Agent::run`, so a
//! `ReviewPolicy` can suspend the run for human review.

use async_trait::async_trait;
use std::sync::Arc;

use chrono::Utc;
use klieo_core::agent::Agent;
use klieo_core::checkpoint::{ApprovalDecision, RunCheckpoint};
use klieo_core::error::{Error as KlieoError, ToolError};
use klieo_core::ids::ThreadId;
use klieo_core::llm::{Message, Role};
use klieo_core::runtime::{resume_from_checkpoint, RunOptions};
use klieo_core::tool::{ToolCtx, ToolInvoker};
use klieo_core::ToolDef;
use klieo_hitl::{run_with_hitl, HitlConfig};
use klieo_hitl_client::HitlClient;

use crate::resume_ticket::{ResumeTicketRecord, ResumeTicketStore};
use crate::AgentContextFactory;

/// Stable wire-safe reason returned to MCP peers when a workflow run
/// suspends. The raw `ReviewPolicy` reason is logged server-side but
/// never echoed: this crate cannot verify the policy installs a
/// non-PII label, and ADR-022 / CWE-209 require the wire surface stay
/// clean of operator-supplied free text.
const SUSPENDED_REASON_WIRE: &str = "workflow suspended for human review";

/// Shared HITL wiring carried by every [`WorkflowAsToolInvoker`] the
/// builder produces. Lives behind an [`Arc`] so a single
/// `with_hitl(..)` call propagates to every registered workflow
/// without cloning the underlying [`HitlClient`] (the client owns a
/// `reqwest::Client` per origin/token pair — caller-shared by design).
#[derive(Clone)]
pub(crate) struct HitlBundle {
    pub(crate) client: Arc<HitlClient>,
    pub(crate) cfg: Arc<HitlConfig>,
}

/// Result of materialising one workflow registration. Carries both
/// the `ToolInvoker` half (driven by `tools/call`) and the
/// [`WorkflowResumeHandle`] half (driven by `klieo/run/resume`).
/// Both halves are `Arc` clones of the same underlying
/// [`WorkflowAsToolInvoker`] so `ctx_factory` / `system_prompt` /
/// `run_options` stay shared with no drift between the two entry
/// points.
pub(crate) struct WorkflowMaterialisation {
    pub(crate) invoker: Arc<dyn ToolInvoker>,
    pub(crate) resume_handle: Arc<dyn WorkflowResumeHandle>,
    pub(crate) name: String,
}

/// Closure that materialises a pending workflow registration once the
/// shared [`HitlBundle`] (and, when the `governor` feature is enabled,
/// the inbound [`crate::governor::GovernorBundle`]) is known at
/// `build()` time. The builder stores these instead of
/// fully-constructed invokers so workflow registration is
/// order-independent relative to `with_hitl` / `with_governor`.
///
/// The materialiser signature is feature-uniform via the
/// [`crate::GovernorBundleHolder`] type alias: under
/// `feature = "governor"` it carries the live bundle; off-feature it
/// erases to a unit struct (the off-feature build path never reaches
/// here because [`crate::McpBuildError::WorkflowWithoutGovernor`]
/// fires first).
pub(crate) type WorkflowMaterialiser = Box<
    dyn FnOnce(
            HitlBundle,
            Option<Arc<ResumeTicketStore>>,
            Option<crate::GovernorBundleHolder>,
        ) -> WorkflowMaterialisation
        + Send
        + 'static,
>;

/// One staged workflow registration. Kept type-erased on the builder so
/// the generic `A` does not surface on the builder type.
pub(crate) struct WorkflowRegistration {
    pub(crate) materialise: WorkflowMaterialiser,
}

/// Resume-side half of a registered workflow. Drives
/// [`klieo_core::runtime::resume_from_checkpoint`] against a
/// `system_prompt` + `run_options` + `ctx_factory` captured at
/// registration time. The HTTP `klieo/run/resume` handler looks
/// implementations up by `workflow_name` after the ticket-bound
/// authz gate has cleared, so this trait never sees raw caller
/// identity.
#[async_trait]
pub(crate) trait WorkflowResumeHandle: Send + Sync {
    /// Re-enter the suspended run carried by `checkpoint`, applying
    /// `decision` to the paused step. Returns the workflow's final
    /// JSON result envelope — the same shape `tools/call` would have
    /// returned on the original happy path.
    ///
    /// `tenant_label` is the non-PII attribution label the resuming
    /// caller's ticket was bound to (`principal_hash` of the verified
    /// principal). The implementation stamps it onto the resumed
    /// context so the continuation is attributed AND governed by the
    /// same per-tenant LLM budget as the original `tools/call` — a
    /// suspend/resume round-trip must not bypass the inbound ship gate.
    #[cfg_attr(not(feature = "http"), allow(dead_code))]
    async fn resume(
        &self,
        checkpoint: RunCheckpoint,
        decision: ApprovalDecision,
        tenant_label: String,
    ) -> Result<serde_json::Value, ToolError>;
}

/// Internal adapter: presents one [`Agent`] as a single-tool
/// [`ToolInvoker`] whose body drives [`run_with_hitl`] instead of
/// [`Agent::run`]. Mirrors [`crate::AgentAsToolInvoker`] field-for-field
/// plus the shared HITL bundle, agent system prompt, and per-run
/// options that carry the [`klieo_core::runtime::ReviewPolicy`].
pub(crate) struct WorkflowAsToolInvoker<A>
where
    A: Agent + 'static,
    A::Input: serde::de::DeserializeOwned + Send + 'static,
{
    pub(crate) name: String,
    pub(crate) system_prompt: String,
    pub(crate) input_schema: serde_json::Value,
    pub(crate) ctx_factory: AgentContextFactory,
    pub(crate) run_options: RunOptions,
    pub(crate) hitl: HitlBundle,
    /// Issued at registration time when the server was built with
    /// `with_checkpoint_kv`. Absence keeps the slice-1 no-ticket
    /// fallback so deployments not yet wired for resume keep working.
    pub(crate) ticket_store: Option<Arc<ResumeTicketStore>>,
    /// Inbound per-tenant LLM budget governor (`feature = "governor"`).
    /// `None` is unreachable on the workflow path — the builder's
    /// [`crate::McpBuildError::WorkflowWithoutGovernor`] hard gate
    /// refuses to ship a workflow-exposing server without it. The
    /// field stays present (rather than feature-gated away) so the
    /// invoker shape does not bifurcate per feature.
    #[cfg(feature = "governor")]
    pub(crate) governor: Option<crate::governor::GovernorBundle>,
    /// Held only so trait bounds on `A` (and any future `Agent`-shape
    /// access — name, system_prompt) stay resolvable; the run loop is
    /// driven via [`run_with_hitl`] which never calls `Agent::run`.
    _agent: std::marker::PhantomData<fn() -> A>,
}

impl<A> WorkflowAsToolInvoker<A>
where
    A: Agent + 'static,
    A::Input: serde::de::DeserializeOwned + Send + 'static,
{
    #[allow(clippy::too_many_arguments)]
    pub(crate) fn new(
        name: String,
        system_prompt: String,
        input_schema: serde_json::Value,
        ctx_factory: AgentContextFactory,
        run_options: RunOptions,
        hitl: HitlBundle,
        ticket_store: Option<Arc<ResumeTicketStore>>,
        #[cfg(feature = "governor")] governor: Option<crate::governor::GovernorBundle>,
    ) -> Self {
        Self {
            name,
            system_prompt,
            input_schema,
            ctx_factory,
            run_options,
            hitl,
            ticket_store,
            #[cfg(feature = "governor")]
            governor,
            _agent: std::marker::PhantomData,
        }
    }

    /// Issue a resume ticket for `checkpoint` bound to `principal`,
    /// returning the wire envelope a peer sees in place of the raw
    /// checkpoint. Best-effort by design: a ticket-store write failure
    /// degrades to the slice-1 no-ticket envelope so a transient KV
    /// blip does not surface a peer-visible 500.
    async fn issue_resume_ticket(
        &self,
        principal: String,
        checkpoint: RunCheckpoint,
    ) -> serde_json::Value {
        let Some(store) = self.ticket_store.as_ref() else {
            return suspended_no_ticket();
        };
        let token = ResumeTicketStore::mint_token();
        let record = ResumeTicketRecord {
            principal,
            workflow_name: self.name.clone(),
            checkpoint,
            created_at: Utc::now(),
        };
        if let Err(err) = store.persist(&token, &record).await {
            tracing::warn!(
                workflow = %self.name,
                error = %err,
                "persist of resume ticket failed; falling back to no-ticket envelope",
            );
            return suspended_no_ticket();
        }
        serde_json::json!({
            "status": "suspended",
            "ticket": token,
            "reason": SUSPENDED_REASON_WIRE,
        })
    }
}

/// Wire envelope returned on suspend when no resume ticket could be
/// issued (no `with_checkpoint_kv` wired, no verified caller principal,
/// or a transient ticket-store write failure). Matches the no-ticket
/// envelope so unwired servers stay backward-compatible.
fn suspended_no_ticket() -> serde_json::Value {
    serde_json::json!({
        "status": "suspended",
        "reason": SUSPENDED_REASON_WIRE,
    })
}

#[async_trait]
impl<A> ToolInvoker for WorkflowAsToolInvoker<A>
where
    A: Agent + 'static,
    A::Input: serde::de::DeserializeOwned + Send + 'static,
{
    fn catalogue(&self) -> Vec<ToolDef> {
        vec![ToolDef::new(
            self.name.clone(),
            format!("klieo workflow: {}", self.name),
            self.input_schema.clone(),
        )]
    }

    async fn invoke(
        &self,
        name: &str,
        args: serde_json::Value,
        tool_ctx: ToolCtx,
    ) -> Result<serde_json::Value, ToolError> {
        if name != self.name {
            return Err(ToolError::UnknownTool(name.into()));
        }
        // Decode only to gate malformed args against `inputSchema`: the loop
        // drives `run_steps` off short-term memory and never sees `A::Input`,
        // but rejecting here keeps the wire-error mapping identical to
        // `AgentAsToolInvoker` (CWE-209: no internal error text over the wire).
        let _decoded: A::Input = serde_json::from_value(args.clone()).map_err(|err| {
            tracing::warn!(workflow = %self.name, error = %err, "decode of MCP tools/call args failed");
            ToolError::InvalidArgs("arguments do not match inputSchema".into())
        })?;

        // Lifted off `tool_ctx` BEFORE building the agent context: the
        // verified principal is server-side authz metadata and must
        // never reach short-term/episodic memory or any LLM-visible
        // surface. Only the hashed label enters the audit trail via
        // `Episode::RunAttributed` (ADR-045).
        let caller_principal = tool_ctx.caller_principal.clone();

        let mut ctx = (self.ctx_factory)();
        ctx.cancel = tool_ctx.cancel.child_token();
        ctx.progress = tool_ctx.progress.clone();
        if let Some(principal) = caller_principal.as_ref() {
            ctx = ctx.with_tenant_label(klieo_core::principal_hash(principal.as_str()));
        }
        // The cross-hop anchor reaches `tool_ctx` only for authenticated
        // callers (HTTP boundary gate), so a stamped `Episode::RunOrigin`
        // is always co-attributable to the `RunAttributed` principal
        // above. Recorded verbatim; never reaches an LLM-visible surface.
        if let Some(anchor) = tool_ctx.parent_anchor.as_ref() {
            ctx = ctx.with_parent_anchor(anchor.as_str().to_string());
        }
        // Per-tenant LLM budget governor. The build-time hard gate
        // (`WorkflowWithoutGovernor`) makes
        // `self.governor.is_some()` invariant on this path; the
        // explicit `if let` keeps the wrap a no-op rather than a
        // panic if a future caller bypasses the gate.
        #[cfg(feature = "governor")]
        if let Some(bundle) = self.governor.as_ref() {
            ctx = crate::governor::wrap_ctx_with_governor(ctx, bundle);
        }
        let thread = ThreadId::new(format!("{}:{}", self.name, ctx.run_id));

        seed_user_message(&ctx, &thread, &args, &self.name).await?;

        let result = run_with_hitl(
            &ctx,
            &self.system_prompt,
            thread,
            self.run_options.clone(),
            &self.hitl.client,
            &self.hitl.cfg,
        )
        .await;

        self.map_hitl_result(
            caller_principal.map(|principal| principal.as_str().to_string()),
            result,
        )
        .await
    }
}

#[async_trait]
impl<A> WorkflowResumeHandle for WorkflowAsToolInvoker<A>
where
    A: Agent + 'static,
    A::Input: serde::de::DeserializeOwned + Send + 'static,
{
    async fn resume(
        &self,
        checkpoint: RunCheckpoint,
        decision: ApprovalDecision,
        tenant_label: String,
    ) -> Result<serde_json::Value, ToolError> {
        // The ctx_factory MUST mint a ctx whose `kv` is the same
        // durable bucket the suspend path used — `RunOptions
        // .checkpoint_kv_bucket` keys into it on resume to claim the
        // ADR-045 one-shot latch (D8). Deployments wiring an in-
        // memory ctx per call defeat that guarantee; the worktree
        // demo wires a process-shared store, see `klieo-server`.
        let ctx = (self.ctx_factory)().with_tenant_label(tenant_label);
        // Resume continues the run under the same attribution AND
        // per-tenant LLM budget as the original `tools/call`; without
        // the governor wrap a suspend/resume round-trip would drive
        // ungoverned paid spend (B4 ship-gate bypass). Shadow rather than
        // `mut` so the no-governor build carries no unused-mut.
        #[cfg(feature = "governor")]
        let ctx = match self.governor.as_ref() {
            Some(bundle) => crate::governor::wrap_ctx_with_governor(ctx, bundle),
            None => ctx,
        };
        let outcome =
            resume_from_checkpoint(&ctx, &self.system_prompt, checkpoint, decision, self.run_options.clone()).await;
        match outcome {
            Ok(text) => Ok(serde_json::Value::String(text)),
            Err(err) => {
                tracing::warn!(
                    workflow = %self.name,
                    error = %err,
                    "workflow resume failed",
                );
                Err(ToolError::Permanent("workflow resume failed".into()))
            }
        }
    }
}

impl<A> WorkflowAsToolInvoker<A>
where
    A: Agent + 'static,
    A::Input: serde::de::DeserializeOwned + Send + 'static,
{
    /// Map the `run_with_hitl` outcome to a wire envelope. On suspend
    /// with both a wired ticket store AND a verified caller principal,
    /// a one-shot opaque ticket replaces the slice-1 reason-only
    /// envelope; the raw `ReviewPolicy` reason + checkpoint bytes
    /// still NEVER cross the wire (CWE-209).
    async fn map_hitl_result(
        &self,
        caller_principal: Option<String>,
        result: Result<String, KlieoError>,
    ) -> Result<serde_json::Value, ToolError> {
        match result {
            Ok(text) => Ok(serde_json::Value::String(text)),
            Err(KlieoError::Suspended { reason, checkpoint }) => {
                tracing::info!(
                    workflow = %self.name,
                    policy_reason = %reason,
                    "workflow suspended on ReviewPolicy; not echoed to peer",
                );
                match (self.ticket_store.as_ref(), caller_principal) {
                    (Some(_), Some(principal)) => {
                        Ok(self.issue_resume_ticket(principal, *checkpoint).await)
                    }
                    _ => {
                        tracing::warn!(
                            workflow = %self.name,
                            "resume unavailable (no checkpoint KV / no caller principal)",
                        );
                        Ok(suspended_no_ticket())
                    }
                }
            }
            Err(other) => {
                tracing::warn!(workflow = %self.name, error = %other, "workflow execution failed");
                Err(ToolError::Permanent("workflow execution failed".into()))
            }
        }
    }
}

/// The `run_steps` loop reacts to short-term memory, so the call args must be
/// seeded as a User turn or the run starts on an empty thread (mirrors
/// `SimpleAgent::run`).
async fn seed_user_message(
    ctx: &klieo_core::AgentContext,
    thread: &ThreadId,
    args: &serde_json::Value,
    workflow: &str,
) -> Result<(), ToolError> {
    let body = args.to_string();
    ctx.short_term
        .append(
            thread.clone(),
            Message {
                role: Role::User,
                content: body,
                tool_calls: vec![],
                tool_call_id: None,
            },
        )
        .await
        .map_err(|err| {
            tracing::warn!(workflow = %workflow, error = %err, "seed user message failed");
            ToolError::Permanent("workflow input persistence failed".into())
        })
}