forge_core/mcp/
context.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::Result;
5use crate::env::{EnvAccess, EnvProvider, RealEnvProvider};
6use crate::function::{AuthContext, JobDispatch, KvHandle, RequestMetadata, WorkflowDispatch};
7use crate::http::CircuitBreakerClient;
8use uuid::Uuid;
9
10#[non_exhaustive]
12pub struct McpToolContext {
13 pub auth: AuthContext,
14 pub request: RequestMetadata,
15 db_pool: sqlx::PgPool,
16 http_client: CircuitBreakerClient,
17 http_timeout: Option<Duration>,
19 job_dispatch: Option<Arc<dyn JobDispatch>>,
20 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
21 env_provider: Arc<dyn EnvProvider>,
22 kv: Option<Arc<dyn KvHandle>>,
23}
24
25impl McpToolContext {
26 pub fn new(db_pool: sqlx::PgPool, auth: AuthContext, request: RequestMetadata) -> Self {
28 Self::with_dispatch(db_pool, auth, request, None, None)
29 }
30
31 pub fn with_dispatch(
33 db_pool: sqlx::PgPool,
34 auth: AuthContext,
35 request: RequestMetadata,
36 job_dispatch: Option<Arc<dyn JobDispatch>>,
37 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
38 ) -> Self {
39 Self::with_env(
40 db_pool,
41 auth,
42 request,
43 job_dispatch,
44 workflow_dispatch,
45 Arc::new(RealEnvProvider::new()),
46 )
47 }
48
49 pub fn with_env(
51 db_pool: sqlx::PgPool,
52 auth: AuthContext,
53 request: RequestMetadata,
54 job_dispatch: Option<Arc<dyn JobDispatch>>,
55 workflow_dispatch: Option<Arc<dyn WorkflowDispatch>>,
56 env_provider: Arc<dyn EnvProvider>,
57 ) -> Self {
58 Self {
59 auth,
60 request,
61 db_pool,
62 http_client: CircuitBreakerClient::with_ssrf_protection(),
63 http_timeout: None,
64 job_dispatch,
65 workflow_dispatch,
66 env_provider,
67 kv: None,
68 }
69 }
70
71 pub fn with_http_client(mut self, client: CircuitBreakerClient) -> Self {
73 self.http_client = client;
74 self
75 }
76
77 pub fn set_kv(&mut self, kv: Arc<dyn KvHandle>) {
80 self.kv = Some(kv);
81 }
82
83 pub fn kv(&self) -> crate::error::Result<&dyn KvHandle> {
85 self.kv
86 .as_deref()
87 .ok_or_else(|| crate::error::ForgeError::internal("KV store not available"))
88 }
89
90 pub fn db(&self) -> crate::function::ForgeDb {
91 crate::function::ForgeDb::from_pool(&self.db_pool)
92 }
93
94 pub fn db_conn(&self) -> crate::function::DbConn<'_> {
96 crate::function::DbConn::Pool(self.db_pool.clone())
97 }
98
99 pub async fn conn(&self) -> sqlx::Result<crate::function::ForgeConn<'static>> {
101 Ok(crate::function::ForgeConn::Pool(
102 self.db_pool.acquire().await?,
103 ))
104 }
105
106 pub fn http(&self) -> crate::http::HttpClient {
108 self.http_client.with_timeout(self.http_timeout)
109 }
110
111 pub fn raw_http(&self) -> &reqwest::Client {
113 self.http_client.inner()
114 }
115
116 pub fn set_http_timeout(&mut self, timeout: Option<Duration>) {
118 self.http_timeout = timeout;
119 }
120
121 pub fn user_id(&self) -> Result<Uuid> {
123 self.auth.require_user_id()
124 }
125
126 pub fn tenant_id(&self) -> Option<Uuid> {
128 self.auth.tenant_id()
129 }
130
131 pub async fn dispatch_job<T: serde::Serialize>(&self, job_type: &str, args: T) -> Result<Uuid> {
133 let dispatcher = self
134 .job_dispatch
135 .as_ref()
136 .ok_or_else(|| crate::error::ForgeError::internal("Job dispatch not available"))?;
137
138 let args_json = serde_json::to_value(args)?;
139 dispatcher
140 .dispatch_by_name(
141 job_type,
142 args_json,
143 self.auth.principal_id(),
144 self.auth.tenant_id(),
145 )
146 .await
147 }
148
149 pub async fn dispatch<J: crate::ForgeJob>(&self, args: J::Args) -> Result<Uuid> {
152 self.dispatch_job(J::info().name, args).await
153 }
154
155 pub async fn cancel_job(&self, job_id: Uuid, reason: Option<String>) -> Result<bool> {
157 let dispatcher = self
158 .job_dispatch
159 .as_ref()
160 .ok_or_else(|| crate::error::ForgeError::internal("Job dispatch not available"))?;
161 dispatcher.cancel(job_id, reason).await
162 }
163
164 pub async fn start_workflow<T: serde::Serialize>(
166 &self,
167 workflow_name: &str,
168 input: T,
169 ) -> Result<Uuid> {
170 let dispatcher = self
171 .workflow_dispatch
172 .as_ref()
173 .ok_or_else(|| crate::error::ForgeError::internal("Workflow dispatch not available"))?;
174
175 let input_json = serde_json::to_value(input)?;
176 dispatcher
177 .start_by_name(
178 workflow_name,
179 input_json,
180 self.auth.principal_id(),
181 Some(self.request.trace_id().to_string()),
182 )
183 .await
184 }
185
186 pub async fn start<W: crate::ForgeWorkflow>(&self, input: W::Input) -> Result<Uuid> {
188 self.start_workflow(W::info().name, input).await
189 }
190}
191
192impl EnvAccess for McpToolContext {
193 fn env_provider(&self) -> &dyn EnvProvider {
194 self.env_provider.as_ref()
195 }
196}