Skip to main content

forge_core/mcp/
context.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::Result;
5use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
6use crate::function::{AuthContext, JobDispatch, KvHandle, RequestMetadata, WorkflowDispatch};
7use crate::http::CircuitBreakerClient;
8use uuid::Uuid;
9
10/// Context for MCP tool execution.
11#[non_exhaustive]
12pub struct McpToolContext {
13    pub auth: AuthContext,
14    pub request: RequestMetadata,
15    db_pool: sqlx::PgPool,
16    http_client: CircuitBreakerClient,
17    /// `None` means unlimited.
18    http_timeout: Option<Duration>,
19    job_dispatch: Option<Arc<dyn JobDispatch>>,
20    workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
21    env_provider: Arc<dyn EnvProvider>,
22    kv: Option<Arc<dyn KvHandle>>,
23}
24
25impl McpToolContext {
26    /// Create a new MCP tool context.
27    pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
28        Self::with_dispatch(db_pool, auth, request, None, None)
29    }
30
31    /// Create a context with dispatch capabilities.
32    pub fn with_dispatch(
33        db_pool: sqlx::PgPool,
34        auth: AuthContext,
35        request: RequestMetadata,
36        job_dispatch: Option<Arc<dyn JobDispatch>>,
37        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
38    ) -> Self {
39        Self::with_env(
40            db_pool,
41            auth,
42            request,
43            job_dispatch,
44            workflow_dispatch,
45            Arc::new(RealEnvProvider::new()),
46        )
47    }
48
49    /// Create a context with a custom environment provider.
50    pub fn with_env(
51        db_pool: sqlx::PgPool,
52        auth: AuthContext,
53        request: RequestMetadata,
54        job_dispatch: Option<Arc<dyn JobDispatch>>,
55        workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
56        env_provider: Arc<dyn EnvProvider>,
57    ) -> Self {
58        Self {
59            auth,
60            request,
61            db_pool,
62            http_client: CircuitBreakerClient::with_ssrf_protection(),
63            http_timeout: None,
64            job_dispatch,
65            workflow_dispatch,
66            env_provider,
67            kv: None,
68        }
69    }
70
71    /// Set the HTTP client. Called by the runtime to inject the shared client.
72    pub fn with_http_client(mut self, client: CircuitBreakerClient) -> Self {
73        self.http_client = client;
74        self
75    }
76
77    /// Attach a KV store handle. Called by the runtime before handing the
78    /// context to the handler.
79    pub fn set_kv(&mut self, kv: Arc<dyn KvHandle>) {
80        self.kv = Some(kv);
81    }
82
83    /// Access the KV store.
84    pub fn kv(&self) -> crate::error::Result<&dyn KvHandle> {
85        self.kv
86            .as_deref()
87            .ok_or_else(|| crate::error::ForgeError::internal("KV store not available"))
88    }
89
90    pub fn db(&self) -> crate::function::ForgeDb {
91        crate::function::ForgeDb::from_pool(&self.db_pool)
92    }
93
94    /// Get a `DbConn` for use in shared helper functions.
95    pub fn db_conn(&self) -> crate::function::DbConn<'_> {
96        crate::function::DbConn::Pool(self.db_pool.clone())
97    }
98
99    /// Acquire a connection compatible with sqlx compile-time checked macros.
100    pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
101        Ok(crate::function::ForgeConn::Pool(
102            self.db_pool.acquire().await?,
103        ))
104    }
105
106    /// Get the HTTP client for external requests.
107    pub fn http(&self) -> crate::http::HttpClient {
108        self.http_client.with_timeout(self.http_timeout)
109    }
110
111    /// Get the raw reqwest client, bypassing circuit breaker execution.
112    pub fn raw_http(&self) -> &reqwest::Client {
113        self.http_client.inner()
114    }
115
116    /// Set the default timeout for outbound HTTP requests.
117    pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
118        self.http_timeout = timeout;
119    }
120
121    /// Get the authenticated user's UUID. Returns 401 if not authenticated.
122    pub fn user_id(&self) -> Result<Uuid> {
123        self.auth.require_user_id()
124    }
125
126    /// Get the tenant ID from JWT claims, if present.
127    pub fn tenant_id(&self) -> Option<Uuid> {
128        self.auth.tenant_id()
129    }
130
131    /// Dispatch a background job.
132    pub async fn dispatch_job<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
133        let dispatcher = self
134            .job_dispatch
135            .as_ref()
136            .ok_or_else(|| crate::error::ForgeError::internal("Job dispatch not available"))?;
137
138        let args_json = serde_json::to_value(args)?;
139        dispatcher
140            .dispatch_by_name(
141                job_type,
142                args_json,
143                self.auth.principal_id(),
144                self.auth.tenant_id(),
145            )
146            .await
147    }
148
149    /// Type-safe dispatch: resolves the job name from the type's `ForgeJob`
150    /// impl and serializes the args at the call site.
151    pub async fn dispatch<J: crate::ForgeJob>(&self, args: J::Args) -> Result<Uuid> {
152        self.dispatch_job(J::info().name, args).await
153    }
154
155    /// Request cancellation for a job.
156    pub async fn cancel_job(&self, job_id: Uuid, reason: Option<String>) -> Result<bool> {
157        let dispatcher = self
158            .job_dispatch
159            .as_ref()
160            .ok_or_else(|| crate::error::ForgeError::internal("Job dispatch not available"))?;
161        dispatcher.cancel(job_id, reason).await
162    }
163
164    /// Start a workflow.
165    pub async fn start_workflow<T: serde::Serialize>(
166        &self,
167        workflow_name: &str,
168        input: T,
169    ) -> Result<Uuid> {
170        let dispatcher = self
171            .workflow_dispatch
172            .as_ref()
173            .ok_or_else(|| crate::error::ForgeError::internal("Workflow dispatch not available"))?;
174
175        let input_json = serde_json::to_value(input)?;
176        dispatcher
177            .start_by_name(
178                workflow_name,
179                input_json,
180                self.auth.principal_id(),
181                Some(self.request.trace_id().to_string()),
182            )
183            .await
184    }
185
186    /// Type-safe workflow start.
187    pub async fn start<W: crate::ForgeWorkflow>(&self, input: W::Input) -> Result<Uuid> {
188        self.start_workflow(W::info().name, input).await
189    }
190}
191
192impl EnvAccess for McpToolContext {
193    fn env_provider(&self) -> &dyn EnvProvider {
194        self.env_provider.as_ref()
195    }
196}