Skip to main content

gestalt/
workflow_provider.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use serde::Serialize;
5use serde_json::Value;
6use tonic::codegen::async_trait;
7use tonic::{Request as GrpcRequest, Response as GrpcResponse, Status};
8
9use crate::api::RuntimeMetadata;
10use crate::error::Result as ProviderResult;
11use crate::generated::v1 as pb;
12use crate::protocol;
13use crate::rpc_status::rpc_status;
14use crate::{Error, Result};
15
16/// Native JSON object used by authored workflow providers.
17pub type WorkflowJson = serde_json::Value;
18
19pub use pb::{workflow_activation, workflow_run_trigger, workflow_step, workflow_value};
20
21/// Alias for `pb::BoundWorkflowTarget`.
22pub type BoundWorkflowTarget = pb::BoundWorkflowTarget;
23/// Alias for `pb::WorkflowStep`.
24pub type WorkflowStep = pb::WorkflowStep;
25/// Alias for `pb::workflow_step::Action`.
26pub type WorkflowStepAction = pb::workflow_step::Action;
27/// Alias for `pb::WorkflowStepAppCall`.
28pub type WorkflowStepAppCall = pb::WorkflowStepAppCall;
29/// Alias for `pb::WorkflowStepAgentTurn`.
30pub type WorkflowStepAgentTurn = pb::WorkflowStepAgentTurn;
31/// Alias for `pb::WorkflowAgentMessage`.
32pub type WorkflowAgentMessage = pb::WorkflowAgentMessage;
33/// Alias for `pb::WorkflowText`.
34pub type WorkflowText = pb::WorkflowText;
35/// Alias for `pb::WorkflowStepWhen`.
36pub type WorkflowStepWhen = pb::WorkflowStepWhen;
37/// Alias for `pb::WorkflowValue`.
38pub type WorkflowValue = pb::WorkflowValue;
39/// Alias for `pb::WorkflowObject`.
40pub type WorkflowObject = pb::WorkflowObject;
41/// Alias for `pb::WorkflowArray`.
42pub type WorkflowArray = pb::WorkflowArray;
43/// Alias for `pb::WorkflowPathSource`.
44pub type WorkflowPathSource = pb::WorkflowPathSource;
45/// Alias for `pb::WorkflowStepOutputSource`.
46pub type WorkflowStepOutputSource = pb::WorkflowStepOutputSource;
47/// Alias for `pb::WorkflowStepInputSource`.
48pub type WorkflowStepInputSource = pb::WorkflowStepInputSource;
49/// Alias for `pb::WorkflowEvent`.
50pub type WorkflowEvent = pb::WorkflowEvent;
51/// Alias for `pb::WorkflowEventMatch`.
52pub type WorkflowEventMatch = pb::WorkflowEventMatch;
53/// Alias for `pb::WorkflowScheduleActivation`.
54pub type WorkflowScheduleActivation = pb::WorkflowScheduleActivation;
55/// Alias for `pb::WorkflowEventActivation`.
56pub type WorkflowEventActivation = pb::WorkflowEventActivation;
57/// Alias for `pb::WorkflowActivation`.
58pub type WorkflowActivation = pb::WorkflowActivation;
59/// Alias for `pb::WorkflowDefinitionSpec`.
60pub type WorkflowDefinitionSpec = pb::WorkflowDefinitionSpec;
61/// Alias for `pb::WorkflowDefinition`.
62pub type WorkflowDefinition = pb::WorkflowDefinition;
63/// Alias for `pb::WorkflowManualTrigger`.
64pub type WorkflowManualTrigger = pb::WorkflowManualTrigger;
65/// Alias for `pb::WorkflowScheduleTrigger`.
66pub type WorkflowScheduleTrigger = pb::WorkflowScheduleTrigger;
67/// Alias for `pb::WorkflowEventTriggerInvocation`.
68pub type WorkflowEventTriggerInvocation = pb::WorkflowEventTriggerInvocation;
69/// Alias for `pb::WorkflowRunTrigger`.
70pub type WorkflowRunTrigger = pb::WorkflowRunTrigger;
71/// Alias for `pb::WorkflowStepAttempt`.
72pub type WorkflowStepAttempt = pb::WorkflowStepAttempt;
73/// Alias for `pb::WorkflowStepExecution`.
74pub type WorkflowStepExecution = pb::WorkflowStepExecution;
75/// Alias for `pb::WorkflowRun`.
76pub type WorkflowRun = pb::WorkflowRun;
77/// Alias for `pb::WorkflowSignal`.
78pub type WorkflowSignal = pb::WorkflowSignal;
79/// Alias for `pb::SignalWorkflowRunResponse`.
80pub type SignalWorkflowRunResponse = pb::SignalWorkflowRunResponse;
81/// Alias for `pb::WorkflowRunEvent`.
82pub type WorkflowRunEvent = pb::WorkflowRunEvent;
83/// Alias for `pb::WorkflowRunStatus`.
84pub type WorkflowRunStatus = pb::WorkflowRunStatus;
85/// Alias for `pb::WorkflowStepStatus`.
86pub type WorkflowStepStatus = pb::WorkflowStepStatus;
87
88/// Alias for `pb::ApplyWorkflowProviderDefinitionRequest`.
89pub type ApplyWorkflowProviderDefinitionRequest = pb::ApplyWorkflowProviderDefinitionRequest;
90/// Alias for `pb::GetWorkflowProviderDefinitionRequest`.
91pub type GetWorkflowProviderDefinitionRequest = pb::GetWorkflowProviderDefinitionRequest;
92/// Alias for `pb::ListWorkflowProviderDefinitionsRequest`.
93pub type ListWorkflowProviderDefinitionsRequest = pb::ListWorkflowProviderDefinitionsRequest;
94/// Alias for `pb::ListWorkflowProviderDefinitionsResponse`.
95pub type ListWorkflowProviderDefinitionsResponse = pb::ListWorkflowProviderDefinitionsResponse;
96/// Alias for the definitionpaused request message.
97pub type SetWorkflowProviderDefinitionPausedRequest =
98    pb::SetWorkflowProviderDefinitionPausedRequest;
99/// Alias for the activationpaused request message.
100pub type SetWorkflowProviderActivationPausedRequest =
101    pb::SetWorkflowProviderActivationPausedRequest;
102/// Alias for `pb::DeleteWorkflowProviderDefinitionRequest`.
103pub type DeleteWorkflowProviderDefinitionRequest = pb::DeleteWorkflowProviderDefinitionRequest;
104/// Alias for `pb::StartWorkflowProviderRunRequest`.
105pub type StartWorkflowProviderRunRequest = pb::StartWorkflowProviderRunRequest;
106/// Alias for `pb::GetWorkflowProviderRunRequest`.
107pub type GetWorkflowProviderRunRequest = pb::GetWorkflowProviderRunRequest;
108/// Alias for `pb::ListWorkflowProviderRunsRequest`.
109pub type ListWorkflowProviderRunsRequest = pb::ListWorkflowProviderRunsRequest;
110/// Alias for `pb::ListWorkflowProviderRunsResponse`.
111pub type ListWorkflowProviderRunsResponse = pb::ListWorkflowProviderRunsResponse;
112/// Alias for `pb::GetWorkflowProviderRunEventsRequest`.
113pub type GetWorkflowProviderRunEventsRequest = pb::GetWorkflowProviderRunEventsRequest;
114/// Alias for `pb::GetWorkflowProviderRunEventsResponse`.
115pub type GetWorkflowProviderRunEventsResponse = pb::GetWorkflowProviderRunEventsResponse;
116/// Alias for `pb::GetWorkflowProviderRunOutputRequest`.
117pub type GetWorkflowProviderRunOutputRequest = pb::GetWorkflowProviderRunOutputRequest;
118/// Alias for `pb::GetWorkflowProviderRunOutputResponse`.
119pub type GetWorkflowProviderRunOutputResponse = pb::GetWorkflowProviderRunOutputResponse;
120/// Alias for `pb::CancelWorkflowProviderRunRequest`.
121pub type CancelWorkflowProviderRunRequest = pb::CancelWorkflowProviderRunRequest;
122/// Alias for `pb::SignalWorkflowProviderRunRequest`.
123pub type SignalWorkflowProviderRunRequest = pb::SignalWorkflowProviderRunRequest;
124/// Alias for `pb::SignalOrStartWorkflowProviderRunRequest`.
125pub type SignalOrStartWorkflowProviderRunRequest = pb::SignalOrStartWorkflowProviderRunRequest;
126/// Alias for `pb::DeliverWorkflowProviderEventRequest`.
127pub type DeliverWorkflowProviderEventRequest = pb::DeliverWorkflowProviderEventRequest;
128
129/// Validates and returns the bound workflow target value.
130pub fn new_bound_workflow_target(
131    input: BoundWorkflowTarget,
132) -> ProviderResult<BoundWorkflowTarget> {
133    Ok(input)
134}
135
136/// Builds the value from an existing target.
137pub fn new_bound_workflow_target_from_target(
138    input: &BoundWorkflowTarget,
139) -> ProviderResult<BoundWorkflowTarget> {
140    Ok(input.clone())
141}
142
143/// Validates and returns the workflow definition spec value.
144pub fn new_workflow_definition_spec(
145    input: WorkflowDefinitionSpec,
146) -> ProviderResult<WorkflowDefinitionSpec> {
147    Ok(input)
148}
149
150/// Validates and returns the workflow definition value.
151pub fn new_workflow_definition(input: WorkflowDefinition) -> ProviderResult<WorkflowDefinition> {
152    Ok(input)
153}
154
155/// Validates and returns the workflow run value.
156pub fn new_workflow_run(input: WorkflowRun) -> ProviderResult<WorkflowRun> {
157    Ok(input)
158}
159
160/// Builds the value from an existing run.
161pub fn new_workflow_run_from_run(input: &WorkflowRun) -> ProviderResult<WorkflowRun> {
162    Ok(input.clone())
163}
164
165/// Validates and returns the workflow event value.
166pub fn new_workflow_event(input: WorkflowEvent) -> ProviderResult<WorkflowEvent> {
167    Ok(input)
168}
169
170/// Builds the value from an existing event.
171pub fn new_workflow_event_from_event(input: &WorkflowEvent) -> ProviderResult<WorkflowEvent> {
172    Ok(input.clone())
173}
174
175/// Validates and returns the workflow event match value.
176pub fn new_workflow_event_match(input: WorkflowEventMatch) -> WorkflowEventMatch {
177    input
178}
179
180/// Validates and returns the workflow signal value.
181pub fn new_workflow_signal(input: WorkflowSignal) -> ProviderResult<WorkflowSignal> {
182    Ok(input)
183}
184
185/// Builds the value from an existing signal.
186pub fn new_workflow_signal_from_signal(input: &WorkflowSignal) -> ProviderResult<WorkflowSignal> {
187    Ok(input.clone())
188}
189
190/// Validates and returns the workflow step value.
191pub fn new_workflow_step(input: WorkflowStep) -> ProviderResult<WorkflowStep> {
192    Ok(input)
193}
194
195/// Validates and returns the workflow step app call value.
196pub fn new_workflow_step_app_call(
197    input: WorkflowStepAppCall,
198) -> ProviderResult<WorkflowStepAppCall> {
199    Ok(input)
200}
201
202/// Validates and returns the workflow step agent turn value.
203pub fn new_workflow_step_agent_turn(
204    input: WorkflowStepAgentTurn,
205) -> ProviderResult<WorkflowStepAgentTurn> {
206    Ok(input)
207}
208
209/// Validates and returns the workflow agent message value.
210pub fn new_workflow_agent_message(
211    input: WorkflowAgentMessage,
212) -> ProviderResult<WorkflowAgentMessage> {
213    Ok(input)
214}
215
216/// Validates and returns the workflow step when value.
217pub fn new_workflow_step_when(input: WorkflowStepWhen) -> ProviderResult<WorkflowStepWhen> {
218    Ok(input)
219}
220
221/// Validates and returns the workflow text value.
222pub fn new_workflow_text(input: WorkflowText) -> WorkflowText {
223    input
224}
225
226/// Validates and returns the workflow value value.
227pub fn new_workflow_value(input: WorkflowValue) -> ProviderResult<WorkflowValue> {
228    Ok(input)
229}
230
231/// Builds a workflow value carrying a literal JSON value.
232pub fn workflow_value_literal<T: Serialize>(value: T) -> ProviderResult<WorkflowValue> {
233    Ok(WorkflowValue {
234        kind: Some(workflow_value::Kind::Literal(protocol::value_from_json(
235            protocol::json_from_serializable(value)?,
236        ))),
237    })
238}
239
240/// Builds a workflow value from named member values.
241pub fn workflow_value_object(fields: BTreeMap<String, WorkflowValue>) -> WorkflowValue {
242    WorkflowValue {
243        kind: Some(workflow_value::Kind::Object(WorkflowObject { fields })),
244    }
245}
246
247/// Builds a workflow value from element values.
248pub fn workflow_value_array(values: Vec<WorkflowValue>) -> WorkflowValue {
249    WorkflowValue {
250        kind: Some(workflow_value::Kind::Array(WorkflowArray { values })),
251    }
252}
253
254/// Builds a workflow value rendered from a text template.
255pub fn workflow_value_template(template: impl Into<String>) -> WorkflowValue {
256    WorkflowValue {
257        kind: Some(workflow_value::Kind::Template(WorkflowText {
258            template: template.into(),
259        })),
260    }
261}
262
263/// Builds a workflow value read from a run input path.
264pub fn workflow_value_input(path: impl Into<String>) -> WorkflowValue {
265    WorkflowValue {
266        kind: Some(workflow_value::Kind::Input(WorkflowPathSource {
267            path: path.into(),
268        })),
269    }
270}
271
272/// Builds a workflow value read from a delivered signal path.
273pub fn workflow_value_signal(path: impl Into<String>) -> WorkflowValue {
274    WorkflowValue {
275        kind: Some(workflow_value::Kind::Signal(WorkflowPathSource {
276            path: path.into(),
277        })),
278    }
279}
280
281/// Builds a workflow value read from a prior step's output.
282pub fn workflow_value_step_output(
283    step_id: impl Into<String>,
284    path: impl Into<String>,
285) -> WorkflowValue {
286    WorkflowValue {
287        kind: Some(workflow_value::Kind::StepOutput(WorkflowStepOutputSource {
288            step_id: step_id.into(),
289            path: path.into(),
290        })),
291    }
292}
293
294/// Builds a workflow value read from a step's input.
295pub fn workflow_value_step_input(
296    step_id: impl Into<String>,
297    path: impl Into<String>,
298) -> WorkflowValue {
299    WorkflowValue {
300        kind: Some(workflow_value::Kind::StepInput(WorkflowStepInputSource {
301            step_id: step_id.into(),
302            path: path.into(),
303        })),
304    }
305}
306
307/// Builds the input form of an existing event.
308pub fn workflow_event_input_from_event(input: &WorkflowEvent) -> WorkflowEvent {
309    input.clone()
310}
311
312/// Builds the input form of an existing match.
313pub fn workflow_event_match_input_from_match(input: &WorkflowEventMatch) -> WorkflowEventMatch {
314    input.clone()
315}
316
317/// Builds the input form of an existing signal.
318pub fn workflow_signal_input_from_signal(input: &WorkflowSignal) -> WorkflowSignal {
319    input.clone()
320}
321
322/// Builds the input form of an existing step.
323pub fn workflow_step_input_from_step(input: &WorkflowStep) -> WorkflowStep {
324    input.clone()
325}
326
327/// Builds the input form of an existing call.
328pub fn workflow_step_app_call_input_from_call(input: &WorkflowStepAppCall) -> WorkflowStepAppCall {
329    input.clone()
330}
331
332/// Builds the input form of an existing turn.
333pub fn workflow_step_agent_turn_input_from_turn(
334    input: &WorkflowStepAgentTurn,
335) -> WorkflowStepAgentTurn {
336    input.clone()
337}
338
339/// Builds the input form of an existing value.
340pub fn workflow_value_input_from_value(input: &WorkflowValue) -> WorkflowValue {
341    input.clone()
342}
343
344/// Builds the input form of an existing trigger.
345pub fn workflow_run_trigger_input_from_trigger(input: &WorkflowRunTrigger) -> WorkflowRunTrigger {
346    input.clone()
347}
348
349#[async_trait]
350/// Provider trait for serving the Gestalt workflow-provider protocol.
351pub trait WorkflowProvider: Send + Sync + 'static {
352    /// Configures the provider before it starts serving requests.
353    async fn configure(
354        &self,
355        _name: &str,
356        _config: serde_json::Map<String, serde_json::Value>,
357    ) -> ProviderResult<()> {
358        Ok(())
359    }
360
361    /// Returns runtime metadata that should augment the static manifest.
362    fn metadata(&self) -> Option<RuntimeMetadata> {
363        None
364    }
365
366    /// Returns non-fatal warnings the host should surface to users.
367    fn warnings(&self) -> Vec<String> {
368        Vec::new()
369    }
370
371    /// Performs an optional health check.
372    async fn health_check(&self) -> ProviderResult<()> {
373        Ok(())
374    }
375
376    /// Starts provider-owned background work after configuration.
377    async fn start(&self) -> ProviderResult<()> {
378        Ok(())
379    }
380
381    /// Shuts the provider down before the runtime exits.
382    async fn close(&self) -> ProviderResult<()> {
383        Ok(())
384    }
385
386    /// Applies a workflow definition, creating or updating it.
387    async fn apply_definition(
388        &self,
389        _request: ApplyWorkflowProviderDefinitionRequest,
390    ) -> ProviderResult<WorkflowDefinition> {
391        Err(crate::Error::unimplemented(
392            "workflow apply definition is not implemented",
393        ))
394    }
395
396    /// Fetches one workflow definition.
397    async fn get_definition(
398        &self,
399        _request: GetWorkflowProviderDefinitionRequest,
400    ) -> ProviderResult<WorkflowDefinition> {
401        Err(crate::Error::unimplemented(
402            "workflow get definition is not implemented",
403        ))
404    }
405
406    /// Lists workflow definitions.
407    async fn list_definitions(
408        &self,
409        _request: ListWorkflowProviderDefinitionsRequest,
410    ) -> ProviderResult<ListWorkflowProviderDefinitionsResponse> {
411        Err(crate::Error::unimplemented(
412            "workflow list definitions is not implemented",
413        ))
414    }
415
416    /// Pauses or resumes a workflow definition.
417    async fn set_definition_paused(
418        &self,
419        _request: SetWorkflowProviderDefinitionPausedRequest,
420    ) -> ProviderResult<WorkflowDefinition> {
421        Err(crate::Error::unimplemented(
422            "workflow set definition paused is not implemented",
423        ))
424    }
425
426    /// Pauses or resumes one activation.
427    async fn set_activation_paused(
428        &self,
429        _request: SetWorkflowProviderActivationPausedRequest,
430    ) -> ProviderResult<WorkflowDefinition> {
431        Err(crate::Error::unimplemented(
432            "workflow set activation paused is not implemented",
433        ))
434    }
435
436    /// Deletes a workflow definition.
437    async fn delete_definition(
438        &self,
439        _request: DeleteWorkflowProviderDefinitionRequest,
440    ) -> ProviderResult<()> {
441        Err(crate::Error::unimplemented(
442            "workflow delete definition is not implemented",
443        ))
444    }
445
446    /// Starts a workflow run.
447    async fn start_run(
448        &self,
449        _request: StartWorkflowProviderRunRequest,
450    ) -> ProviderResult<WorkflowRun> {
451        Err(crate::Error::unimplemented(
452            "workflow start run is not implemented",
453        ))
454    }
455
456    /// Lists workflow runs.
457    async fn list_runs(
458        &self,
459        _request: ListWorkflowProviderRunsRequest,
460    ) -> ProviderResult<ListWorkflowProviderRunsResponse> {
461        Err(crate::Error::unimplemented(
462            "workflow list runs is not implemented",
463        ))
464    }
465
466    /// Fetches one workflow run.
467    async fn get_run(
468        &self,
469        _request: GetWorkflowProviderRunRequest,
470    ) -> ProviderResult<WorkflowRun> {
471        Err(crate::Error::unimplemented(
472            "workflow get run is not implemented",
473        ))
474    }
475
476    /// Fetches the events of a workflow run.
477    async fn get_run_events(
478        &self,
479        _request: GetWorkflowProviderRunEventsRequest,
480    ) -> ProviderResult<GetWorkflowProviderRunEventsResponse> {
481        Err(crate::Error::unimplemented(
482            "workflow get run events is not implemented",
483        ))
484    }
485
486    /// Fetches the output value of a workflow run.
487    async fn get_run_output(
488        &self,
489        _request: GetWorkflowProviderRunOutputRequest,
490    ) -> ProviderResult<GetWorkflowProviderRunOutputResponse> {
491        Err(crate::Error::unimplemented(
492            "workflow get run output is not implemented",
493        ))
494    }
495
496    /// Cancels a workflow run.
497    async fn cancel_run(
498        &self,
499        _request: CancelWorkflowProviderRunRequest,
500    ) -> ProviderResult<WorkflowRun> {
501        Err(crate::Error::unimplemented(
502            "workflow cancel run is not implemented",
503        ))
504    }
505
506    /// Signals an existing workflow run.
507    async fn signal_run(
508        &self,
509        _request: SignalWorkflowProviderRunRequest,
510    ) -> ProviderResult<SignalWorkflowRunResponse> {
511        Err(crate::Error::unimplemented(
512            "workflow signal run is not implemented",
513        ))
514    }
515
516    /// Signals a run, starting it first when absent.
517    async fn signal_or_start_run(
518        &self,
519        _request: SignalOrStartWorkflowProviderRunRequest,
520    ) -> ProviderResult<SignalWorkflowRunResponse> {
521        Err(crate::Error::unimplemented(
522            "workflow signal or start run is not implemented",
523        ))
524    }
525
526    /// Delivers an external event to the workflow engine.
527    async fn deliver_event(
528        &self,
529        _request: DeliverWorkflowProviderEventRequest,
530    ) -> ProviderResult<WorkflowEvent> {
531        Err(crate::Error::unimplemented(
532            "workflow deliver event is not implemented",
533        ))
534    }
535}
536
537#[derive(Clone)]
538pub(crate) struct WorkflowServer<P> {
539    provider: Arc<P>,
540}
541
542impl<P> WorkflowServer<P> {
543    pub(crate) fn new(provider: Arc<P>) -> Self {
544        Self { provider }
545    }
546}
547
548#[async_trait]
549impl<P> pb::workflow_server::Workflow for WorkflowServer<P>
550where
551    P: WorkflowProvider,
552{
553    async fn apply_definition(
554        &self,
555        request: GrpcRequest<pb::ApplyWorkflowProviderDefinitionRequest>,
556    ) -> std::result::Result<GrpcResponse<pb::WorkflowDefinition>, Status> {
557        let definition = self
558            .provider
559            .apply_definition(request.into_inner())
560            .await
561            .map_err(|error| rpc_status("workflow apply definition", error))?;
562        Ok(GrpcResponse::new(definition))
563    }
564
565    async fn get_definition(
566        &self,
567        request: GrpcRequest<pb::GetWorkflowProviderDefinitionRequest>,
568    ) -> std::result::Result<GrpcResponse<pb::WorkflowDefinition>, Status> {
569        let definition = self
570            .provider
571            .get_definition(request.into_inner())
572            .await
573            .map_err(|error| rpc_status("workflow get definition", error))?;
574        Ok(GrpcResponse::new(definition))
575    }
576
577    async fn list_definitions(
578        &self,
579        request: GrpcRequest<pb::ListWorkflowProviderDefinitionsRequest>,
580    ) -> std::result::Result<GrpcResponse<pb::ListWorkflowProviderDefinitionsResponse>, Status>
581    {
582        let response = self
583            .provider
584            .list_definitions(request.into_inner())
585            .await
586            .map_err(|error| rpc_status("workflow list definitions", error))?;
587        Ok(GrpcResponse::new(response))
588    }
589
590    async fn set_definition_paused(
591        &self,
592        request: GrpcRequest<pb::SetWorkflowProviderDefinitionPausedRequest>,
593    ) -> std::result::Result<GrpcResponse<pb::WorkflowDefinition>, Status> {
594        let definition = self
595            .provider
596            .set_definition_paused(request.into_inner())
597            .await
598            .map_err(|error| rpc_status("workflow set definition paused", error))?;
599        Ok(GrpcResponse::new(definition))
600    }
601
602    async fn set_activation_paused(
603        &self,
604        request: GrpcRequest<pb::SetWorkflowProviderActivationPausedRequest>,
605    ) -> std::result::Result<GrpcResponse<pb::WorkflowDefinition>, Status> {
606        let definition = self
607            .provider
608            .set_activation_paused(request.into_inner())
609            .await
610            .map_err(|error| rpc_status("workflow set activation paused", error))?;
611        Ok(GrpcResponse::new(definition))
612    }
613
614    async fn delete_definition(
615        &self,
616        request: GrpcRequest<pb::DeleteWorkflowProviderDefinitionRequest>,
617    ) -> std::result::Result<GrpcResponse<()>, Status> {
618        self.provider
619            .delete_definition(request.into_inner())
620            .await
621            .map_err(|error| rpc_status("workflow delete definition", error))?;
622        Ok(GrpcResponse::new(()))
623    }
624
625    async fn start_run(
626        &self,
627        request: GrpcRequest<pb::StartWorkflowProviderRunRequest>,
628    ) -> std::result::Result<GrpcResponse<pb::WorkflowRun>, Status> {
629        let run = self
630            .provider
631            .start_run(request.into_inner())
632            .await
633            .map_err(|error| rpc_status("workflow start run", error))?;
634        Ok(GrpcResponse::new(run))
635    }
636
637    async fn list_runs(
638        &self,
639        request: GrpcRequest<pb::ListWorkflowProviderRunsRequest>,
640    ) -> std::result::Result<GrpcResponse<pb::ListWorkflowProviderRunsResponse>, Status> {
641        let response = self
642            .provider
643            .list_runs(request.into_inner())
644            .await
645            .map_err(|error| rpc_status("workflow list runs", error))?;
646        Ok(GrpcResponse::new(response))
647    }
648
649    async fn get_run(
650        &self,
651        request: GrpcRequest<pb::GetWorkflowProviderRunRequest>,
652    ) -> std::result::Result<GrpcResponse<pb::WorkflowRun>, Status> {
653        let run = self
654            .provider
655            .get_run(request.into_inner())
656            .await
657            .map_err(|error| rpc_status("workflow get run", error))?;
658        Ok(GrpcResponse::new(run))
659    }
660
661    async fn get_run_events(
662        &self,
663        request: GrpcRequest<pb::GetWorkflowProviderRunEventsRequest>,
664    ) -> std::result::Result<GrpcResponse<pb::GetWorkflowProviderRunEventsResponse>, Status> {
665        let response = self
666            .provider
667            .get_run_events(request.into_inner())
668            .await
669            .map_err(|error| rpc_status("workflow get run events", error))?;
670        Ok(GrpcResponse::new(response))
671    }
672
673    async fn get_run_output(
674        &self,
675        request: GrpcRequest<pb::GetWorkflowProviderRunOutputRequest>,
676    ) -> std::result::Result<GrpcResponse<pb::GetWorkflowProviderRunOutputResponse>, Status> {
677        let response = self
678            .provider
679            .get_run_output(request.into_inner())
680            .await
681            .map_err(|error| rpc_status("workflow get run output", error))?;
682        Ok(GrpcResponse::new(response))
683    }
684
685    async fn cancel_run(
686        &self,
687        request: GrpcRequest<pb::CancelWorkflowProviderRunRequest>,
688    ) -> std::result::Result<GrpcResponse<pb::WorkflowRun>, Status> {
689        let run = self
690            .provider
691            .cancel_run(request.into_inner())
692            .await
693            .map_err(|error| rpc_status("workflow cancel run", error))?;
694        Ok(GrpcResponse::new(run))
695    }
696
697    async fn signal_run(
698        &self,
699        request: GrpcRequest<pb::SignalWorkflowProviderRunRequest>,
700    ) -> std::result::Result<GrpcResponse<pb::SignalWorkflowRunResponse>, Status> {
701        let response = self
702            .provider
703            .signal_run(request.into_inner())
704            .await
705            .map_err(|error| rpc_status("workflow signal run", error))?;
706        Ok(GrpcResponse::new(response))
707    }
708
709    async fn signal_or_start_run(
710        &self,
711        request: GrpcRequest<pb::SignalOrStartWorkflowProviderRunRequest>,
712    ) -> std::result::Result<GrpcResponse<pb::SignalWorkflowRunResponse>, Status> {
713        let response = self
714            .provider
715            .signal_or_start_run(request.into_inner())
716            .await
717            .map_err(|error| rpc_status("workflow signal or start run", error))?;
718        Ok(GrpcResponse::new(response))
719    }
720
721    async fn deliver_event(
722        &self,
723        request: GrpcRequest<pb::DeliverWorkflowProviderEventRequest>,
724    ) -> std::result::Result<GrpcResponse<pb::WorkflowEvent>, Status> {
725        let event = self
726            .provider
727            .deliver_event(request.into_inner())
728            .await
729            .map_err(|error| rpc_status("workflow deliver event", error))?;
730        Ok(GrpcResponse::new(event))
731    }
732}
733
734#[derive(Clone, Debug, Default, PartialEq)]
735/// One workflow callback execution request.
736pub struct WorkflowExecutionRequest {
737    /// The `provider_name` field.
738    pub provider_name: String,
739    /// The `run_id` field.
740    pub run_id: String,
741    /// The `target` field.
742    pub target: Option<BoundWorkflowTarget>,
743    /// The `trigger` field.
744    pub trigger: Option<WorkflowRunTrigger>,
745    /// The `input` field.
746    pub input: Option<Value>,
747    /// The `metadata` field.
748    pub metadata: Option<Value>,
749    /// The `signals` field.
750    pub signals: Vec<WorkflowSignal>,
751}
752
753#[derive(Clone, Debug, Default, PartialEq)]
754/// The run context a workflow value evaluates against.
755pub struct WorkflowEvalContext {
756    /// The `request` field.
757    pub request: WorkflowExecutionRequest,
758    /// The `outputs` field.
759    pub outputs: BTreeMap<String, Value>,
760    /// The `inputs` field.
761    pub inputs: BTreeMap<String, Value>,
762    /// The `allow_inputs` field.
763    pub allow_inputs: bool,
764}
765
766#[derive(Clone, Debug, PartialEq)]
767/// The resolved value of one workflow value expression.
768pub struct WorkflowEvalResult {
769    /// The `value` field.
770    pub value: Option<Value>,
771    /// The `resolved` field.
772    pub resolved: bool,
773}
774
775/// Resolves one workflow value expression against a run context.
776pub fn evaluate_workflow_value(
777    ctx: &WorkflowEvalContext,
778    value: &WorkflowValue,
779) -> Result<WorkflowEvalResult> {
780    match value.kind.as_ref() {
781        None => Ok(WorkflowEvalResult {
782            value: None,
783            resolved: true,
784        }),
785        Some(workflow_value::Kind::Literal(value)) => Ok(WorkflowEvalResult {
786            value: Some(protocol::json_from_value(value)),
787            resolved: true,
788        }),
789        Some(workflow_value::Kind::Object(object)) => {
790            let mut out = serde_json::Map::new();
791            for (key, nested) in &object.fields {
792                let resolved = evaluate_workflow_value(ctx, nested)?;
793                if !resolved.resolved {
794                    return Ok(WorkflowEvalResult {
795                        value: None,
796                        resolved: false,
797                    });
798                }
799                out.insert(key.clone(), resolved.value.unwrap_or(Value::Null));
800            }
801            Ok(WorkflowEvalResult {
802                value: Some(Value::Object(out)),
803                resolved: true,
804            })
805        }
806        Some(workflow_value::Kind::Array(array)) => {
807            let mut out = Vec::with_capacity(array.values.len());
808            for nested in &array.values {
809                let resolved = evaluate_workflow_value(ctx, nested)?;
810                if !resolved.resolved {
811                    return Ok(WorkflowEvalResult {
812                        value: None,
813                        resolved: false,
814                    });
815                }
816                out.push(resolved.value.unwrap_or(Value::Null));
817            }
818            Ok(WorkflowEvalResult {
819                value: Some(Value::Array(out)),
820                resolved: true,
821            })
822        }
823        Some(workflow_value::Kind::Template(text)) => Ok(WorkflowEvalResult {
824            value: Some(Value::String(render_workflow_template(
825                ctx,
826                &text.template,
827            )?)),
828            resolved: true,
829        }),
830        Some(workflow_value::Kind::Input(source)) => {
831            path_value_option(ctx.request.input.as_ref(), &source.path)
832        }
833        Some(workflow_value::Kind::Signal(source)) => latest_signal_payload(ctx)
834            .map(|payload| path_value(&payload, &source.path))
835            .unwrap_or_else(|| {
836                Ok(WorkflowEvalResult {
837                    value: None,
838                    resolved: false,
839                })
840            }),
841        Some(workflow_value::Kind::StepOutput(source)) => {
842            match ctx.outputs.get(source.step_id.trim()) {
843                Some(output) => path_value(output, &source.path),
844                None => Err(Error::bad_request(format!(
845                    "workflow step output references missing step {:?}",
846                    source.step_id.trim()
847                ))),
848            }
849        }
850        Some(workflow_value::Kind::StepInput(source)) => {
851            if !ctx.allow_inputs {
852                return Err(Error::bad_request(
853                    "step input references are not allowed here",
854                ));
855            }
856            match ctx.inputs.get(source.step_id.trim()) {
857                Some(input) => path_value(input, &source.path),
858                None => Ok(WorkflowEvalResult {
859                    value: None,
860                    resolved: false,
861                }),
862            }
863        }
864    }
865}
866
867/// Renders a workflow text template against provided values.
868pub fn render_workflow_template(ctx: &WorkflowEvalContext, template: &str) -> Result<String> {
869    let mut out = String::new();
870    let mut i = 0;
871    while i < template.len() {
872        let remaining = &template[i..];
873        if remaining.starts_with("$${{") {
874            out.push_str("${{");
875            i += 4;
876            continue;
877        }
878        if !remaining.starts_with("${{") {
879            let ch = remaining.chars().next().expect("non-empty string");
880            out.push(ch);
881            i += ch.len_utf8();
882            continue;
883        }
884        let end = remaining[3..]
885            .find("}}")
886            .ok_or_else(|| Error::bad_request("unterminated template expression"))?;
887        let expr = remaining[3..3 + end].trim();
888        let resolved = template_expression_value(ctx, expr)?;
889        if !resolved.resolved {
890            return Err(Error::bad_request(format!(
891                "template expression {expr:?} did not resolve"
892            )));
893        }
894        out.push_str(&render_template_value(
895            resolved.value.unwrap_or(Value::Null),
896        )?);
897        i += 3 + end + 2;
898    }
899    Ok(out)
900}
901
902/// Returns the most recent delivery of a named signal.
903pub fn latest_workflow_signal(signals: &[WorkflowSignal]) -> Option<&WorkflowSignal> {
904    signals.last()
905}
906
907/// Reads a dotted path from a JSON-like value.
908pub fn path_value(root: &Value, path: &str) -> Result<WorkflowEvalResult> {
909    if path.trim().is_empty() {
910        return Ok(WorkflowEvalResult {
911            value: Some(root.clone()),
912            resolved: true,
913        });
914    }
915    let mut current = root;
916    for segment in path_segments(path)? {
917        match (current, segment) {
918            (Value::Object(map), PathSegment::Key(key)) => match map.get(&key) {
919                Some(next) => current = next,
920                None => {
921                    return Ok(WorkflowEvalResult {
922                        value: None,
923                        resolved: false,
924                    });
925                }
926            },
927            (Value::Array(values), PathSegment::Index(index)) if index < values.len() => {
928                current = &values[index];
929            }
930            _ => {
931                return Ok(WorkflowEvalResult {
932                    value: None,
933                    resolved: false,
934                });
935            }
936        }
937    }
938    Ok(WorkflowEvalResult {
939        value: Some(current.clone()),
940        resolved: true,
941    })
942}
943
944fn path_value_option(root: Option<&Value>, path: &str) -> Result<WorkflowEvalResult> {
945    match root {
946        Some(root) => path_value(root, path),
947        None => Ok(WorkflowEvalResult {
948            value: None,
949            resolved: false,
950        }),
951    }
952}
953
954fn template_expression_value(ctx: &WorkflowEvalContext, expr: &str) -> Result<WorkflowEvalResult> {
955    if expr == "input" {
956        return path_value_option(ctx.request.input.as_ref(), "");
957    }
958    if let Some(path) = expr.strip_prefix("input.") {
959        return path_value_option(ctx.request.input.as_ref(), path);
960    }
961    if expr == "signal" {
962        return latest_signal_payload(ctx)
963            .map(|payload| path_value(&payload, ""))
964            .unwrap_or_else(|| {
965                Ok(WorkflowEvalResult {
966                    value: None,
967                    resolved: false,
968                })
969            });
970    }
971    if let Some(path) = expr.strip_prefix("signal.") {
972        return latest_signal_payload(ctx)
973            .map(|payload| path_value(&payload, path))
974            .unwrap_or_else(|| {
975                Ok(WorkflowEvalResult {
976                    value: None,
977                    resolved: false,
978                })
979            });
980    }
981    if let Some(path) = expr.strip_prefix("steps.") {
982        return step_expression_value(ctx, path);
983    }
984    Err(Error::bad_request(format!(
985        "unsupported template expression {expr:?}"
986    )))
987}
988
989fn step_expression_value(ctx: &WorkflowEvalContext, path: &str) -> Result<WorkflowEvalResult> {
990    let Some((step_id, rest)) = path.split_once('.') else {
991        return Err(Error::bad_request(format!(
992            "unsupported step template expression {path:?}"
993        )));
994    };
995    if let Some(path) = rest.strip_prefix("outputs") {
996        let path = path.strip_prefix('.').unwrap_or("");
997        return match ctx.outputs.get(step_id.trim()) {
998            Some(output) => path_value(output, path),
999            None => Err(Error::bad_request(format!(
1000                "workflow step output references missing step {:?}",
1001                step_id.trim()
1002            ))),
1003        };
1004    }
1005    if let Some(path) = rest.strip_prefix("inputs") {
1006        if !ctx.allow_inputs {
1007            return Err(Error::bad_request(
1008                "step input references are not allowed here",
1009            ));
1010        }
1011        let path = path.strip_prefix('.').unwrap_or("");
1012        return match ctx.inputs.get(step_id.trim()) {
1013            Some(input) => path_value(input, path),
1014            None => Ok(WorkflowEvalResult {
1015                value: None,
1016                resolved: false,
1017            }),
1018        };
1019    }
1020    Err(Error::bad_request(format!(
1021        "unsupported step template expression {path:?}"
1022    )))
1023}
1024
1025fn latest_signal_payload(ctx: &WorkflowEvalContext) -> Option<Value> {
1026    latest_workflow_signal(&ctx.request.signals)
1027        .and_then(|signal| signal.payload.as_ref())
1028        .map(protocol::json_from_struct)
1029}
1030
1031fn render_template_value(value: Value) -> Result<String> {
1032    match value {
1033        Value::String(value) => Ok(value),
1034        other => Ok(serde_json::to_string(&other)?),
1035    }
1036}
1037
1038#[derive(Debug)]
1039enum PathSegment {
1040    Key(String),
1041    Index(usize),
1042}
1043
1044fn path_segments(path: &str) -> Result<Vec<PathSegment>> {
1045    let chars: Vec<char> = path.trim().chars().collect();
1046    let mut out = Vec::new();
1047    let mut i = 0;
1048    while i < chars.len() {
1049        match chars[i] {
1050            '.' => i += 1,
1051            '[' => {
1052                let start = i + 1;
1053                let mut end = start;
1054                while end < chars.len() && chars[end] != ']' {
1055                    end += 1;
1056                }
1057                if end >= chars.len() {
1058                    return Err(Error::bad_request(format!(
1059                        "invalid workflow path {path:?}"
1060                    )));
1061                }
1062                let token: String = chars[start..end].iter().collect();
1063                let token = token.trim();
1064                if token.starts_with('"') || token.starts_with('\'') {
1065                    out.push(PathSegment::Key(unquote_path_key(token, path)?));
1066                } else {
1067                    out.push(PathSegment::Index(token.parse().map_err(|_| {
1068                        Error::bad_request(format!("invalid workflow path {path:?}"))
1069                    })?));
1070                }
1071                i = end + 1;
1072            }
1073            _ => {
1074                let start = i;
1075                while i < chars.len() && chars[i] != '.' && chars[i] != '[' {
1076                    i += 1;
1077                }
1078                let key: String = chars[start..i].iter().collect::<String>().trim().to_owned();
1079                if key.is_empty() {
1080                    return Err(Error::bad_request(format!(
1081                        "invalid workflow path {path:?}"
1082                    )));
1083                }
1084                out.push(PathSegment::Key(key));
1085            }
1086        }
1087    }
1088    Ok(out)
1089}
1090
1091fn unquote_path_key(token: &str, path: &str) -> Result<String> {
1092    if token.starts_with('"') {
1093        return serde_json::from_str(token)
1094            .map_err(|_| Error::bad_request(format!("invalid workflow path {path:?}")));
1095    }
1096    if token.len() < 2 || !token.ends_with('\'') {
1097        return Err(Error::bad_request(format!(
1098            "invalid workflow path {path:?}"
1099        )));
1100    }
1101    let mut out = String::new();
1102    let mut chars = token[1..token.len() - 1].chars();
1103    while let Some(ch) = chars.next() {
1104        if ch != '\\' {
1105            out.push(ch);
1106            continue;
1107        }
1108        let escaped = chars
1109            .next()
1110            .ok_or_else(|| Error::bad_request(format!("invalid workflow path {path:?}")))?;
1111        match escaped {
1112            '\'' | '"' | '\\' => out.push(escaped),
1113            'n' => out.push('\n'),
1114            'r' => out.push('\r'),
1115            't' => out.push('\t'),
1116            'u' => {
1117                let mut hex = String::new();
1118                for _ in 0..4 {
1119                    hex.push(chars.next().ok_or_else(|| {
1120                        Error::bad_request(format!("invalid workflow path {path:?}"))
1121                    })?);
1122                }
1123                let code = u32::from_str_radix(&hex, 16)
1124                    .map_err(|_| Error::bad_request(format!("invalid workflow path {path:?}")))?;
1125                let value = char::from_u32(code)
1126                    .ok_or_else(|| Error::bad_request(format!("invalid workflow path {path:?}")))?;
1127                out.push(value);
1128            }
1129            other => out.push(other),
1130        }
1131    }
1132    Ok(out)
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137    use serde_json::json;
1138
1139    use crate::protocol;
1140    use crate::workflow_provider::{
1141        WorkflowEvalContext, WorkflowExecutionRequest, WorkflowSignal, evaluate_workflow_value,
1142        path_value, render_workflow_template, workflow_value_input,
1143    };
1144
1145    #[test]
1146    fn evaluates_current_templates_and_paths() {
1147        let ctx = WorkflowEvalContext {
1148            request: WorkflowExecutionRequest {
1149                provider_name: "indexeddb".to_owned(),
1150                run_id: "run-1".to_owned(),
1151                input: Some(json!({"customer": {"id": "cust_1"}})),
1152                signals: vec![WorkflowSignal {
1153                    id: "sig-1".to_owned(),
1154                    payload: Some(
1155                        protocol::struct_from_json(json!({"thread": {"ts": "123.456"}})).unwrap(),
1156                    ),
1157                    ..Default::default()
1158                }],
1159                ..Default::default()
1160            },
1161            inputs: [("draft".to_owned(), json!({"title": "PR"}))].into(),
1162            outputs: [("diagnosis".to_owned(), json!({"action": "open_pr"}))].into(),
1163            allow_inputs: true,
1164        };
1165
1166        assert_eq!(
1167            render_workflow_template(
1168                &ctx,
1169                "customer=${{ input.customer.id }}; thread=${{ signal.thread.ts }}; action=${{ steps.diagnosis.outputs.action }}; title=${{ steps.draft.inputs.title }}; literal=$${{x}}",
1170            )
1171            .unwrap(),
1172            "customer=cust_1; thread=123.456; action=open_pr; title=PR; literal=${{x}}"
1173        );
1174        assert_eq!(
1175            evaluate_workflow_value(&ctx, &workflow_value_input("customer.id"))
1176                .unwrap()
1177                .value,
1178            Some(json!("cust_1"))
1179        );
1180        assert_eq!(
1181            path_value(
1182                &json!({"quote'key": {"value": 42}}),
1183                "['quote\\'key'].value"
1184            )
1185            .unwrap()
1186            .value,
1187            Some(json!(42))
1188        );
1189    }
1190}