1use std::str::FromStr;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, anyhow};
5use async_trait::async_trait;
6use greentic_session::{SessionData, SessionKey as StoreSessionKey};
7use greentic_types::{
8 EnvId, FlowId, GreenticError, PackId, ReplyScope, SessionCursor as TypesSessionCursor,
9 TenantCtx, TenantId, UserId,
10};
11use rand::{Rng, rng};
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use sha2::{Digest, Sha256};
15
16use super::api::{RunFlowRequest, RunnerApi};
17use super::builder::{Runner, RunnerBuilder};
18use super::error::{GResult, RunnerError};
19use super::glue::{FnSecretsHost, FnTelemetryHost};
20use super::host::{HostBundle, SecretsHost, SessionHost, StateHost};
21use super::policy::Policy;
22use super::registry::{Adapter, AdapterCall, AdapterRegistry};
23use super::shims::{InMemorySessionHost, InMemoryStateHost};
24use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
25
26use crate::config::{HostConfig, SecretsPolicy};
27use crate::pack::FlowDescriptor;
28use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
29use crate::runner::mocks::MockLayer;
30use crate::secrets::{DynSecretsManager, scoped_secret_path};
31use crate::storage::session::DynSessionStore;
32
33const DEFAULT_ENV: &str = "local";
34const PACK_FLOW_ADAPTER: &str = "pack_flow";
35
36#[derive(Clone)]
37pub struct FlowResumeStore {
38 store: DynSessionStore,
39}
40
41impl FlowResumeStore {
42 pub fn new(store: DynSessionStore) -> Self {
43 Self { store }
44 }
45
46 pub fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
47 let (mut ctx, user, _, scope) = build_store_ctx(envelope)?;
48 ctx = ctx.with_user(Some(user.clone()));
49
50 let mut scopes = vec![scope.clone()];
51 if scope.correlation.is_some() {
52 let mut base = scope.clone();
53 base.correlation = None;
54 scopes.push(base);
55 }
56
57 for lookup in scopes {
58 if let Some(key) = self
59 .store
60 .find_wait_by_scope(&ctx, &user, &lookup)
61 .map_err(map_store_error)?
62 {
63 let Some(data) = self.store.get_session(&key).map_err(map_store_error)? else {
64 continue;
65 };
66 let record: FlowResumeRecord =
67 serde_json::from_str(&data.context_json).map_err(|err| {
68 RunnerError::Session {
69 reason: format!("failed to decode flow resume snapshot: {err}"),
70 }
71 })?;
72 if record.snapshot.flow_id == envelope.flow_id {
73 if let Some(pack_id) = envelope.pack_id.as_deref()
74 && record.snapshot.pack_id != pack_id
75 {
76 return Err(RunnerError::Session {
77 reason: format!(
78 "resume pack mismatch: expected {pack_id}, found {}",
79 record.snapshot.pack_id
80 ),
81 });
82 }
83 return Ok(Some(record.snapshot));
84 }
85 }
86 }
87
88 Ok(None)
89 }
90
91 pub fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<ReplyScope> {
92 let (ctx, user, hint, scope) = build_store_ctx(envelope)?;
93 let record = FlowResumeRecord {
94 snapshot: wait.snapshot.clone(),
95 reason: wait.reason.clone(),
96 };
97 let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
98 let mut reply_scope = scope.clone();
99 if reply_scope.correlation.is_none() {
100 reply_scope.correlation = Some(generate_correlation_id());
101 }
102 let mut store_scope = scope;
103 store_scope.correlation = None;
104 let session_key = StoreSessionKey::new(format!("{hint}::{}", store_scope.scope_hash()));
105 self.store
106 .register_wait(&ctx, &user, &store_scope, &session_key, data, None)
107 .map_err(map_store_error)?;
108 Ok(reply_scope)
109 }
110
111 pub fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
112 let (ctx, user, _, scope) = build_store_ctx(envelope)?;
113 let mut scopes = vec![scope.clone()];
114 if scope.correlation.is_some() {
115 let mut base = scope;
116 base.correlation = None;
117 scopes.push(base);
118 }
119 for lookup in scopes {
120 self.store
121 .clear_wait(&ctx, &user, &lookup)
122 .map_err(map_store_error)?;
123 }
124 Ok(())
125 }
126}
127
128#[derive(Serialize, Deserialize)]
129struct FlowResumeRecord {
130 snapshot: FlowSnapshot,
131 #[serde(default)]
132 reason: Option<String>,
133}
134
135fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String, ReplyScope)> {
136 let base_hint = envelope
137 .session_hint
138 .clone()
139 .unwrap_or_else(|| envelope.canonical_session_hint());
140 let hint = if let Some(pack_id) = envelope.pack_id.as_deref() {
141 format!("{base_hint}::pack={pack_id}")
142 } else {
143 base_hint.clone()
144 };
145 let user = derive_user_id(&hint)?;
146 let scope = envelope
147 .reply_scope
148 .clone()
149 .ok_or_else(|| RunnerError::Session {
150 reason: "Cannot suspend: reply_scope missing; provider plugin must supply ReplyScope"
151 .to_string(),
152 })?;
153 let mut ctx = envelope.tenant_ctx();
154 ctx = ctx.with_session(hint.clone());
155 ctx = ctx.with_user(Some(user.clone()));
156 Ok((ctx, user, hint, scope))
157}
158
159fn record_to_session_data(
160 record: &FlowResumeRecord,
161 ctx: TenantCtx,
162 user: &UserId,
163 session_hint: &str,
164) -> GResult<SessionData> {
165 let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
166 let pack = PackId::from_str(record.snapshot.pack_id.as_str()).map_err(map_store_error)?;
167 let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
168 if let Some(reason) = record.reason.clone() {
169 cursor = cursor.with_wait_reason(reason);
170 }
171 let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
172 reason: format!("failed to encode flow resume snapshot: {err}"),
173 })?;
174 let ctx = ctx
175 .with_user(Some(user.clone()))
176 .with_session(session_hint.to_string())
177 .with_flow(record.snapshot.flow_id.clone());
178 Ok(SessionData {
179 tenant_ctx: ctx,
180 flow_id: flow,
181 pack_id: Some(pack),
182 cursor,
183 context_json,
184 })
185}
186
187fn derive_user_id(hint: &str) -> GResult<UserId> {
188 let digest = Sha256::digest(hint.as_bytes());
189 let slug = format!("sess{}", hex::encode(&digest[..8]));
190 UserId::from_str(&slug).map_err(map_store_error)
191}
192
193fn map_store_error(err: GreenticError) -> RunnerError {
194 RunnerError::Session {
195 reason: err.to_string(),
196 }
197}
198
199fn generate_correlation_id() -> String {
200 let mut bytes = [0u8; 16];
201 rng().fill(&mut bytes);
202 hex::encode(bytes)
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::runner::engine::ExecutionState;
209 use crate::storage::session::new_session_store;
210 use serde_json::json;
211
212 fn sample_envelope() -> IngressEnvelope {
213 IngressEnvelope {
214 tenant: "demo".into(),
215 env: Some("local".into()),
216 pack_id: Some("pack.demo".into()),
217 flow_id: "flow.main".into(),
218 flow_type: None,
219 action: Some("messaging".into()),
220 session_hint: Some("demo:provider:chan:conv:user".into()),
221 provider: Some("provider".into()),
222 channel: Some("chan".into()),
223 conversation: Some("conv".into()),
224 user: Some("user".into()),
225 activity_id: Some("act-1".into()),
226 timestamp: None,
227 payload: json!({ "text": "hi" }),
228 metadata: None,
229 reply_scope: Some(ReplyScope {
230 conversation: "conv".into(),
231 thread: None,
232 reply_to: None,
233 correlation: None,
234 }),
235 }
236 }
237
238 fn sample_wait() -> FlowWait {
239 let state: ExecutionState = serde_json::from_value(json!({
240 "input": { "text": "hi" },
241 "nodes": {},
242 "egress": []
243 }))
244 .expect("state");
245 FlowWait {
246 reason: Some("await-user".into()),
247 snapshot: FlowSnapshot {
248 pack_id: "pack.demo".into(),
249 flow_id: "flow.main".into(),
250 next_node: "node-2".into(),
251 state,
252 },
253 }
254 }
255
256 #[test]
257 fn derive_user_id_is_stable() {
258 let hint = "some-tenant::session-key";
259 let a = derive_user_id(hint).unwrap();
260 let b = derive_user_id(hint).unwrap();
261 assert_eq!(a, b);
262 assert!(a.as_str().starts_with("sess"));
263 }
264
265 #[test]
266 fn resume_store_roundtrip() -> GResult<()> {
267 let store = FlowResumeStore::new(new_session_store());
268 let envelope = sample_envelope();
269 assert!(store.fetch(&envelope)?.is_none());
270
271 let wait = sample_wait();
272 let _ = store.save(&envelope, &wait)?;
273 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
274 assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
275 assert_eq!(snapshot.next_node, wait.snapshot.next_node);
276
277 store.clear(&envelope)?;
278 assert!(store.fetch(&envelope)?.is_none());
279 Ok(())
280 }
281
282 #[test]
283 fn resume_store_overwrites_existing() -> GResult<()> {
284 let store = FlowResumeStore::new(new_session_store());
285 let envelope = sample_envelope();
286 let mut wait = sample_wait();
287 let _ = store.save(&envelope, &wait)?;
288
289 wait.snapshot.next_node = "node-3".into();
290 wait.reason = Some("retry".into());
291 let _ = store.save(&envelope, &wait)?;
292
293 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
294 assert_eq!(snapshot.next_node, "node-3");
295 store.clear(&envelope)?;
296 Ok(())
297 }
298
299 #[test]
300 fn canonicalize_populates_defaults() {
301 let envelope = IngressEnvelope {
302 tenant: "demo".into(),
303 env: None,
304 pack_id: None,
305 flow_id: "flow.main".into(),
306 flow_type: None,
307 action: None,
308 session_hint: None,
309 provider: None,
310 channel: None,
311 conversation: None,
312 user: None,
313 activity_id: Some("activity-1".into()),
314 timestamp: None,
315 payload: json!({}),
316 metadata: None,
317 reply_scope: None,
318 }
319 .canonicalize();
320
321 assert_eq!(envelope.provider.as_deref(), Some("provider"));
322 assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
323 assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
324 assert_eq!(envelope.user.as_deref(), Some("activity-1"));
325 assert!(envelope.session_hint.is_some());
326 }
327}
328
329pub struct StateMachineRuntime {
330 runner: Runner,
331}
332
333impl StateMachineRuntime {
334 pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
336 let secrets = Arc::new(FnSecretsHost::new(|name| {
337 Err(RunnerError::Secrets {
338 reason: format!("secret {name} unavailable (noop host)"),
339 })
340 }));
341 let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
342 let session = Arc::new(InMemorySessionHost::new());
343 let state = Arc::new(InMemoryStateHost::new());
344 let host = HostBundle::new(secrets, telemetry, session, state);
345
346 let adapters = AdapterRegistry::default();
347 let policy = Policy::default();
348
349 let mut builder = RunnerBuilder::new()
350 .with_host(host)
351 .with_adapters(adapters)
352 .with_policy(policy);
353 for flow in flows {
354 builder = builder.with_flow(flow);
355 }
356 let runner = builder.build()?;
357 Ok(Self { runner })
358 }
359
360 pub fn from_flow_engine(
362 config: Arc<HostConfig>,
363 engine: Arc<FlowEngine>,
364 session_host: Arc<dyn SessionHost>,
365 session_store: DynSessionStore,
366 state_host: Arc<dyn StateHost>,
367 secrets_manager: DynSecretsManager,
368 mocks: Option<Arc<MockLayer>>,
369 ) -> Result<Self> {
370 let policy = Arc::new(config.secrets_policy.clone());
371 let tenant_ctx = config.tenant_ctx();
372 let secrets = Arc::new(PolicySecretsHost::new(policy, secrets_manager, tenant_ctx));
373 let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
374 tracing::debug!(?span, ?fields, "telemetry emit");
375 Ok(())
376 }));
377 let host = HostBundle::new(secrets, telemetry, session_host, state_host);
378 let resume_store = FlowResumeStore::new(session_store);
379
380 let mut adapters = AdapterRegistry::default();
381 adapters.register(
382 PACK_FLOW_ADAPTER,
383 Box::new(PackFlowAdapter::new(
384 Arc::clone(&config),
385 Arc::clone(&engine),
386 resume_store,
387 mocks,
388 )),
389 );
390
391 let flows = build_flow_definitions(engine.flows());
392 let mut builder = RunnerBuilder::new()
393 .with_host(host)
394 .with_adapters(adapters)
395 .with_policy(Policy::default());
396 for flow in flows {
397 builder = builder.with_flow(flow);
398 }
399 let runner = builder
400 .build()
401 .map_err(|err| anyhow!("state machine init failed: {err}"))?;
402 Ok(Self { runner })
403 }
404
405 pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
407 let tenant_ctx = envelope.tenant_ctx();
408 let session_hint = envelope
409 .session_hint
410 .clone()
411 .unwrap_or_else(|| envelope.canonical_session_hint());
412 let pack_id = envelope.pack_id.clone().ok_or_else(|| {
413 anyhow!("pack_id missing; ingress must specify pack_id for multi-pack flows")
414 })?;
415 let input =
416 serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
417 let request = RunFlowRequest {
418 tenant: tenant_ctx,
419 pack_id,
420 flow_id: envelope.flow_id.clone(),
421 input,
422 session_hint: Some(session_hint),
423 };
424 let result: super::api::RunFlowResult = self
425 .runner
426 .run_flow(request)
427 .await
428 .map_err(|err| anyhow!("flow execution failed: {err}"))?;
429 let outcome = result.outcome;
430 Ok(outcome.get("response").cloned().unwrap_or(outcome))
431 }
432}
433
434struct PolicySecretsHost {
435 policy: Arc<SecretsPolicy>,
436 manager: DynSecretsManager,
437 tenant_ctx: TenantCtx,
438}
439
440impl PolicySecretsHost {
441 fn new(policy: Arc<SecretsPolicy>, manager: DynSecretsManager, tenant_ctx: TenantCtx) -> Self {
442 Self {
443 policy,
444 manager,
445 tenant_ctx,
446 }
447 }
448}
449
450#[async_trait]
451impl SecretsHost for PolicySecretsHost {
452 async fn get(&self, name: &str) -> GResult<String> {
453 if !self.policy.is_allowed(name) {
454 return Err(RunnerError::Secrets {
455 reason: format!("secret {name} denied by policy"),
456 });
457 }
458 let scoped_key =
459 scoped_secret_path(&self.tenant_ctx, name).map_err(|err| RunnerError::Secrets {
460 reason: format!("secret {name} scope invalid: {err}"),
461 })?;
462 let bytes = self
463 .manager
464 .read(scoped_key.as_str())
465 .await
466 .map_err(|err| RunnerError::Secrets {
467 reason: format!("secret {name} unavailable: {err}"),
468 })?;
469 String::from_utf8(bytes).map_err(|err| RunnerError::Secrets {
470 reason: format!("secret {name} not valid UTF-8: {err}"),
471 })
472 }
473}
474
475fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
476 flows
477 .iter()
478 .map(|descriptor| {
479 FlowDefinition::new(
480 super::api::FlowSummary {
481 pack_id: descriptor.pack_id.clone(),
482 id: descriptor.id.clone(),
483 name: descriptor
484 .description
485 .clone()
486 .unwrap_or_else(|| descriptor.id.clone()),
487 version: descriptor.version.clone(),
488 description: descriptor.description.clone(),
489 },
490 serde_json::json!({
491 "type": "object"
492 }),
493 vec![FlowStep::Adapter(AdapterCall {
494 adapter: PACK_FLOW_ADAPTER.into(),
495 operation: descriptor.id.clone(),
496 payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
497 })],
498 )
499 })
500 .collect()
501}
502
503struct PackFlowAdapter {
504 tenant: String,
505 config: Arc<HostConfig>,
506 engine: Arc<FlowEngine>,
507 resume: FlowResumeStore,
508 mocks: Option<Arc<MockLayer>>,
509}
510
511impl PackFlowAdapter {
512 fn new(
513 config: Arc<HostConfig>,
514 engine: Arc<FlowEngine>,
515 resume: FlowResumeStore,
516 mocks: Option<Arc<MockLayer>>,
517 ) -> Self {
518 Self {
519 tenant: config.tenant.clone(),
520 config,
521 engine,
522 resume,
523 mocks,
524 }
525 }
526}
527
528#[async_trait::async_trait]
529impl Adapter for PackFlowAdapter {
530 async fn call(&self, call: &AdapterCall) -> GResult<Value> {
531 let envelope: IngressEnvelope =
532 serde_json::from_value(call.payload.clone()).map_err(|err| {
533 RunnerError::AdapterCall {
534 reason: format!("invalid ingress payload: {err}"),
535 }
536 })?;
537 let envelope = envelope.canonicalize();
538 let flow_id = call.operation.clone();
539 let action_owned = envelope.action.clone();
540 let session_owned = envelope
541 .session_hint
542 .clone()
543 .unwrap_or_else(|| envelope.canonical_session_hint());
544 let provider_owned = envelope.provider.clone();
545 let payload = envelope.payload.clone();
546 let retry_config = self.config.retry_config().into();
547
548 let pack_id = if let Some(pack_id) = envelope.pack_id.as_deref() {
549 let found = self.engine.flow_by_key(pack_id, &flow_id).is_some();
550 if !found {
551 return Err(RunnerError::AdapterCall {
552 reason: format!("flow {flow_id} not registered for pack {pack_id}"),
553 });
554 }
555 pack_id
556 } else if let Some(flow) = self.engine.flow_by_id(&flow_id) {
557 flow.pack_id.as_str()
558 } else {
559 return Err(RunnerError::AdapterCall {
560 reason: format!("flow {flow_id} is ambiguous; pack_id is required"),
561 });
562 };
563
564 let mocks = self.mocks.as_deref();
565 let ctx = FlowContext {
566 tenant: &self.tenant,
567 pack_id,
568 flow_id: &flow_id,
569 node_id: None,
570 tool: None,
571 action: action_owned.as_deref(),
572 session_id: Some(session_owned.as_str()),
573 provider_id: provider_owned.as_deref(),
574 retry_config,
575 observer: None,
576 mocks,
577 };
578
579 let execution = if let Some(snapshot) = self.resume.fetch(&envelope)? {
580 let resume_pack_id = snapshot.pack_id.clone();
581 let resume_ctx = FlowContext {
582 pack_id: resume_pack_id.as_str(),
583 ..ctx
584 };
585 self.engine.resume(resume_ctx, snapshot, payload).await
586 } else {
587 self.engine.execute(ctx, payload).await
588 }
589 .map_err(|err| RunnerError::AdapterCall {
590 reason: err.to_string(),
591 })?;
592
593 match execution.status {
594 FlowStatus::Completed => {
595 self.resume.clear(&envelope)?;
596 Ok(execution.output)
597 }
598 FlowStatus::Waiting(wait) => {
599 let reply_scope = self.resume.save(&envelope, &wait)?;
600 Ok(json!({
601 "status": "pending",
602 "reason": wait.reason,
603 "resume": wait.snapshot,
604 "reply_scope": reply_scope,
605 "response": execution.output,
606 }))
607 }
608 }
609 }
610}
611
612#[derive(Clone, Debug, Serialize, Deserialize)]
613pub struct IngressEnvelope {
614 pub tenant: String,
615 #[serde(default, skip_serializing_if = "Option::is_none")]
616 pub env: Option<String>,
617 #[serde(default, skip_serializing_if = "Option::is_none")]
618 pub pack_id: Option<String>,
619 pub flow_id: String,
620 #[serde(default, skip_serializing_if = "Option::is_none")]
621 pub flow_type: Option<String>,
622 #[serde(default, skip_serializing_if = "Option::is_none")]
623 pub action: Option<String>,
624 #[serde(default, skip_serializing_if = "Option::is_none")]
625 pub session_hint: Option<String>,
626 #[serde(default, skip_serializing_if = "Option::is_none")]
627 pub provider: Option<String>,
628 #[serde(default, skip_serializing_if = "Option::is_none")]
629 pub channel: Option<String>,
630 #[serde(default, skip_serializing_if = "Option::is_none")]
631 pub conversation: Option<String>,
632 #[serde(default, skip_serializing_if = "Option::is_none")]
633 pub user: Option<String>,
634 #[serde(default, skip_serializing_if = "Option::is_none")]
635 pub activity_id: Option<String>,
636 #[serde(default, skip_serializing_if = "Option::is_none")]
637 pub timestamp: Option<String>,
638 #[serde(default)]
639 pub payload: Value,
640 #[serde(default, skip_serializing_if = "Option::is_none")]
641 pub metadata: Option<Value>,
642 #[serde(default, skip_serializing_if = "Option::is_none")]
643 pub reply_scope: Option<ReplyScope>,
644}
645
646impl IngressEnvelope {
647 pub fn canonicalize(mut self) -> Self {
648 if self.provider.is_none() {
649 self.provider = Some("provider".into());
650 }
651 if self.channel.is_none() {
652 self.channel = Some(self.flow_id.clone());
653 }
654 if self.conversation.is_none() {
655 self.conversation = self.channel.clone();
656 }
657 if self.user.is_none() {
658 if let Some(ref hint) = self.session_hint {
659 self.user = Some(hint.clone());
660 } else if let Some(ref activity) = self.activity_id {
661 self.user = Some(activity.clone());
662 } else {
663 self.user = Some("user".into());
664 }
665 }
666 if self.session_hint.is_none() {
667 self.session_hint = Some(self.canonical_session_hint());
668 }
669 if self.reply_scope.is_none()
670 && let Some(conversation) = self.conversation.clone()
671 {
672 self.reply_scope = Some(ReplyScope {
673 conversation,
674 thread: None,
675 reply_to: None,
676 correlation: None,
677 });
678 }
679 self
680 }
681
682 pub fn canonical_session_hint(&self) -> String {
683 format!(
684 "{}:{}:{}:{}:{}",
685 self.tenant,
686 self.provider.as_deref().unwrap_or("provider"),
687 self.channel.as_deref().unwrap_or("channel"),
688 self.conversation.as_deref().unwrap_or("conversation"),
689 self.user.as_deref().unwrap_or("user")
690 )
691 }
692
693 pub fn tenant_ctx(&self) -> TenantCtx {
694 let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
695 let env = EnvId::from_str(env_raw.as_str())
696 .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
697 let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
698 TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
699 });
700 let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
701 if let Some(provider) = &self.provider {
702 ctx = ctx.with_provider(provider.clone());
703 }
704 if let Some(session) = &self.session_hint {
705 ctx = ctx.with_session(session.clone());
706 }
707 ctx
708 }
709}