1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use serde::Serialize;
5use serde_json::Value;
6use tonic::codegen::async_trait;
7use tonic::{Request as GrpcRequest, Response as GrpcResponse, Status};
8
9use crate::api::RuntimeMetadata;
10use crate::error::Result as ProviderResult;
11use crate::generated::v1 as pb;
12use crate::protocol;
13use crate::rpc_status::rpc_status;
14use crate::{Error, Result};
15
16pub type WorkflowJson = serde_json::Value;
18
19pub use pb::{workflow_activation, workflow_run_trigger, workflow_step, workflow_value};
20
21pub type BoundWorkflowTarget = pb::BoundWorkflowTarget;
23pub type WorkflowStep = pb::WorkflowStep;
25pub type WorkflowStepAction = pb::workflow_step::Action;
27pub type WorkflowStepAppCall = pb::WorkflowStepAppCall;
29pub type WorkflowStepAgentTurn = pb::WorkflowStepAgentTurn;
31pub type WorkflowAgentMessage = pb::WorkflowAgentMessage;
33pub type WorkflowText = pb::WorkflowText;
35pub type WorkflowStepWhen = pb::WorkflowStepWhen;
37pub type WorkflowValue = pb::WorkflowValue;
39pub type WorkflowObject = pb::WorkflowObject;
41pub type WorkflowArray = pb::WorkflowArray;
43pub type WorkflowPathSource = pb::WorkflowPathSource;
45pub type WorkflowStepOutputSource = pb::WorkflowStepOutputSource;
47pub type WorkflowStepInputSource = pb::WorkflowStepInputSource;
49pub type WorkflowEvent = pb::WorkflowEvent;
51pub type WorkflowEventMatch = pb::WorkflowEventMatch;
53pub type WorkflowScheduleActivation = pb::WorkflowScheduleActivation;
55pub type WorkflowEventActivation = pb::WorkflowEventActivation;
57pub type WorkflowActivation = pb::WorkflowActivation;
59pub type WorkflowDefinitionSpec = pb::WorkflowDefinitionSpec;
61pub type WorkflowDefinition = pb::WorkflowDefinition;
63pub type WorkflowManualTrigger = pb::WorkflowManualTrigger;
65pub type WorkflowScheduleTrigger = pb::WorkflowScheduleTrigger;
67pub type WorkflowEventTriggerInvocation = pb::WorkflowEventTriggerInvocation;
69pub type WorkflowRunTrigger = pb::WorkflowRunTrigger;
71pub type WorkflowStepAttempt = pb::WorkflowStepAttempt;
73pub type WorkflowStepExecution = pb::WorkflowStepExecution;
75pub type WorkflowRun = pb::WorkflowRun;
77pub type WorkflowSignal = pb::WorkflowSignal;
79pub type SignalWorkflowRunResponse = pb::SignalWorkflowRunResponse;
81pub type WorkflowRunEvent = pb::WorkflowRunEvent;
83pub type WorkflowRunStatus = pb::WorkflowRunStatus;
85pub type WorkflowStepStatus = pb::WorkflowStepStatus;
87
88pub type ApplyWorkflowProviderDefinitionRequest = pb::ApplyWorkflowProviderDefinitionRequest;
90pub type GetWorkflowProviderDefinitionRequest = pb::GetWorkflowProviderDefinitionRequest;
92pub type ListWorkflowProviderDefinitionsRequest = pb::ListWorkflowProviderDefinitionsRequest;
94pub type ListWorkflowProviderDefinitionsResponse = pb::ListWorkflowProviderDefinitionsResponse;
96pub type SetWorkflowProviderDefinitionPausedRequest =
98 pb::SetWorkflowProviderDefinitionPausedRequest;
99pub type SetWorkflowProviderActivationPausedRequest =
101 pb::SetWorkflowProviderActivationPausedRequest;
102pub type DeleteWorkflowProviderDefinitionRequest = pb::DeleteWorkflowProviderDefinitionRequest;
104pub type StartWorkflowProviderRunRequest = pb::StartWorkflowProviderRunRequest;
106pub type GetWorkflowProviderRunRequest = pb::GetWorkflowProviderRunRequest;
108pub type ListWorkflowProviderRunsRequest = pb::ListWorkflowProviderRunsRequest;
110pub type ListWorkflowProviderRunsResponse = pb::ListWorkflowProviderRunsResponse;
112pub type GetWorkflowProviderRunEventsRequest = pb::GetWorkflowProviderRunEventsRequest;
114pub type GetWorkflowProviderRunEventsResponse = pb::GetWorkflowProviderRunEventsResponse;
116pub type GetWorkflowProviderRunOutputRequest = pb::GetWorkflowProviderRunOutputRequest;
118pub type GetWorkflowProviderRunOutputResponse = pb::GetWorkflowProviderRunOutputResponse;
120pub type CancelWorkflowProviderRunRequest = pb::CancelWorkflowProviderRunRequest;
122pub type SignalWorkflowProviderRunRequest = pb::SignalWorkflowProviderRunRequest;
124pub type SignalOrStartWorkflowProviderRunRequest = pb::SignalOrStartWorkflowProviderRunRequest;
126pub type DeliverWorkflowProviderEventRequest = pb::DeliverWorkflowProviderEventRequest;
128
129pub fn new_bound_workflow_target(
131 input: BoundWorkflowTarget,
132) -> ProviderResult<BoundWorkflowTarget> {
133 Ok(input)
134}
135
136pub fn new_bound_workflow_target_from_target(
138 input: &BoundWorkflowTarget,
139) -> ProviderResult<BoundWorkflowTarget> {
140 Ok(input.clone())
141}
142
143pub fn new_workflow_definition_spec(
145 input: WorkflowDefinitionSpec,
146) -> ProviderResult<WorkflowDefinitionSpec> {
147 Ok(input)
148}
149
150pub fn new_workflow_definition(input: WorkflowDefinition) -> ProviderResult<WorkflowDefinition> {
152 Ok(input)
153}
154
155pub fn new_workflow_run(input: WorkflowRun) -> ProviderResult<WorkflowRun> {
157 Ok(input)
158}
159
160pub fn new_workflow_run_from_run(input: &WorkflowRun) -> ProviderResult<WorkflowRun> {
162 Ok(input.clone())
163}
164
165pub fn new_workflow_event(input: WorkflowEvent) -> ProviderResult<WorkflowEvent> {
167 Ok(input)
168}
169
170pub fn new_workflow_event_from_event(input: &WorkflowEvent) -> ProviderResult<WorkflowEvent> {
172 Ok(input.clone())
173}
174
175pub fn new_workflow_event_match(input: WorkflowEventMatch) -> WorkflowEventMatch {
177 input
178}
179
180pub fn new_workflow_signal(input: WorkflowSignal) -> ProviderResult<WorkflowSignal> {
182 Ok(input)
183}
184
185pub fn new_workflow_signal_from_signal(input: &WorkflowSignal) -> ProviderResult<WorkflowSignal> {
187 Ok(input.clone())
188}
189
190pub fn new_workflow_step(input: WorkflowStep) -> ProviderResult<WorkflowStep> {
192 Ok(input)
193}
194
195pub fn new_workflow_step_app_call(
197 input: WorkflowStepAppCall,
198) -> ProviderResult<WorkflowStepAppCall> {
199 Ok(input)
200}
201
202pub fn new_workflow_step_agent_turn(
204 input: WorkflowStepAgentTurn,
205) -> ProviderResult<WorkflowStepAgentTurn> {
206 Ok(input)
207}
208
209pub fn new_workflow_agent_message(
211 input: WorkflowAgentMessage,
212) -> ProviderResult<WorkflowAgentMessage> {
213 Ok(input)
214}
215
216pub fn new_workflow_step_when(input: WorkflowStepWhen) -> ProviderResult<WorkflowStepWhen> {
218 Ok(input)
219}
220
221pub fn new_workflow_text(input: WorkflowText) -> WorkflowText {
223 input
224}
225
226pub fn new_workflow_value(input: WorkflowValue) -> ProviderResult<WorkflowValue> {
228 Ok(input)
229}
230
231pub fn workflow_value_literal<T: Serialize>(value: T) -> ProviderResult<WorkflowValue> {
233 Ok(WorkflowValue {
234 kind: Some(workflow_value::Kind::Literal(protocol::value_from_json(
235 protocol::json_from_serializable(value)?,
236 ))),
237 })
238}
239
240pub fn workflow_value_object(fields: BTreeMap<String, WorkflowValue>) -> WorkflowValue {
242 WorkflowValue {
243 kind: Some(workflow_value::Kind::Object(WorkflowObject { fields })),
244 }
245}
246
247pub fn workflow_value_array(values: Vec<WorkflowValue>) -> WorkflowValue {
249 WorkflowValue {
250 kind: Some(workflow_value::Kind::Array(WorkflowArray { values })),
251 }
252}
253
254pub fn workflow_value_template(template: impl Into<String>) -> WorkflowValue {
256 WorkflowValue {
257 kind: Some(workflow_value::Kind::Template(WorkflowText {
258 template: template.into(),
259 })),
260 }
261}
262
263pub fn workflow_value_input(path: impl Into<String>) -> WorkflowValue {
265 WorkflowValue {
266 kind: Some(workflow_value::Kind::Input(WorkflowPathSource {
267 path: path.into(),
268 })),
269 }
270}
271
272pub fn workflow_value_signal(path: impl Into<String>) -> WorkflowValue {
274 WorkflowValue {
275 kind: Some(workflow_value::Kind::Signal(WorkflowPathSource {
276 path: path.into(),
277 })),
278 }
279}
280
281pub fn workflow_value_step_output(
283 step_id: impl Into<String>,
284 path: impl Into<String>,
285) -> WorkflowValue {
286 WorkflowValue {
287 kind: Some(workflow_value::Kind::StepOutput(WorkflowStepOutputSource {
288 step_id: step_id.into(),
289 path: path.into(),
290 })),
291 }
292}
293
294pub fn workflow_value_step_input(
296 step_id: impl Into<String>,
297 path: impl Into<String>,
298) -> WorkflowValue {
299 WorkflowValue {
300 kind: Some(workflow_value::Kind::StepInput(WorkflowStepInputSource {
301 step_id: step_id.into(),
302 path: path.into(),
303 })),
304 }
305}
306
307pub fn workflow_event_input_from_event(input: &WorkflowEvent) -> WorkflowEvent {
309 input.clone()
310}
311
312pub fn workflow_event_match_input_from_match(input: &WorkflowEventMatch) -> WorkflowEventMatch {
314 input.clone()
315}
316
317pub fn workflow_signal_input_from_signal(input: &WorkflowSignal) -> WorkflowSignal {
319 input.clone()
320}
321
322pub fn workflow_step_input_from_step(input: &WorkflowStep) -> WorkflowStep {
324 input.clone()
325}
326
327pub fn workflow_step_app_call_input_from_call(input: &WorkflowStepAppCall) -> WorkflowStepAppCall {
329 input.clone()
330}
331
332pub fn workflow_step_agent_turn_input_from_turn(
334 input: &WorkflowStepAgentTurn,
335) -> WorkflowStepAgentTurn {
336 input.clone()
337}
338
339pub fn workflow_value_input_from_value(input: &WorkflowValue) -> WorkflowValue {
341 input.clone()
342}
343
344pub fn workflow_run_trigger_input_from_trigger(input: &WorkflowRunTrigger) -> WorkflowRunTrigger {
346 input.clone()
347}
348
349#[async_trait]
350pub trait WorkflowProvider: Send + Sync + 'static {
352 async fn configure(
354 &self,
355 _name: &str,
356 _config: serde_json::Map<String, serde_json::Value>,
357 ) -> ProviderResult<()> {
358 Ok(())
359 }
360
361 fn metadata(&self) -> Option<RuntimeMetadata> {
363 None
364 }
365
366 fn warnings(&self) -> Vec<String> {
368 Vec::new()
369 }
370
371 async fn health_check(&self) -> ProviderResult<()> {
373 Ok(())
374 }
375
376 async fn start(&self) -> ProviderResult<()> {
378 Ok(())
379 }
380
381 async fn close(&self) -> ProviderResult<()> {
383 Ok(())
384 }
385
386 async fn apply_definition(
388 &self,
389 _request: ApplyWorkflowProviderDefinitionRequest,
390 ) -> ProviderResult<WorkflowDefinition> {
391 Err(crate::Error::unimplemented(
392 "workflow apply definition is not implemented",
393 ))
394 }
395
396 async fn get_definition(
398 &self,
399 _request: GetWorkflowProviderDefinitionRequest,
400 ) -> ProviderResult<WorkflowDefinition> {
401 Err(crate::Error::unimplemented(
402 "workflow get definition is not implemented",
403 ))
404 }
405
406 async fn list_definitions(
408 &self,
409 _request: ListWorkflowProviderDefinitionsRequest,
410 ) -> ProviderResult<ListWorkflowProviderDefinitionsResponse> {
411 Err(crate::Error::unimplemented(
412 "workflow list definitions is not implemented",
413 ))
414 }
415
416 async fn set_definition_paused(
418 &self,
419 _request: SetWorkflowProviderDefinitionPausedRequest,
420 ) -> ProviderResult<WorkflowDefinition> {
421 Err(crate::Error::unimplemented(
422 "workflow set definition paused is not implemented",
423 ))
424 }
425
426 async fn set_activation_paused(
428 &self,
429 _request: SetWorkflowProviderActivationPausedRequest,
430 ) -> ProviderResult<WorkflowDefinition> {
431 Err(crate::Error::unimplemented(
432 "workflow set activation paused is not implemented",
433 ))
434 }
435
436 async fn delete_definition(
438 &self,
439 _request: DeleteWorkflowProviderDefinitionRequest,
440 ) -> ProviderResult<()> {
441 Err(crate::Error::unimplemented(
442 "workflow delete definition is not implemented",
443 ))
444 }
445
446 async fn start_run(
448 &self,
449 _request: StartWorkflowProviderRunRequest,
450 ) -> ProviderResult<WorkflowRun> {
451 Err(crate::Error::unimplemented(
452 "workflow start run is not implemented",
453 ))
454 }
455
456 async fn list_runs(
458 &self,
459 _request: ListWorkflowProviderRunsRequest,
460 ) -> ProviderResult<ListWorkflowProviderRunsResponse> {
461 Err(crate::Error::unimplemented(
462 "workflow list runs is not implemented",
463 ))
464 }
465
466 async fn get_run(
468 &self,
469 _request: GetWorkflowProviderRunRequest,
470 ) -> ProviderResult<WorkflowRun> {
471 Err(crate::Error::unimplemented(
472 "workflow get run is not implemented",
473 ))
474 }
475
476 async fn get_run_events(
478 &self,
479 _request: GetWorkflowProviderRunEventsRequest,
480 ) -> ProviderResult<GetWorkflowProviderRunEventsResponse> {
481 Err(crate::Error::unimplemented(
482 "workflow get run events is not implemented",
483 ))
484 }
485
486 async fn get_run_output(
488 &self,
489 _request: GetWorkflowProviderRunOutputRequest,
490 ) -> ProviderResult<GetWorkflowProviderRunOutputResponse> {
491 Err(crate::Error::unimplemented(
492 "workflow get run output is not implemented",
493 ))
494 }
495
496 async fn cancel_run(
498 &self,
499 _request: CancelWorkflowProviderRunRequest,
500 ) -> ProviderResult<WorkflowRun> {
501 Err(crate::Error::unimplemented(
502 "workflow cancel run is not implemented",
503 ))
504 }
505
506 async fn signal_run(
508 &self,
509 _request: SignalWorkflowProviderRunRequest,
510 ) -> ProviderResult<SignalWorkflowRunResponse> {
511 Err(crate::Error::unimplemented(
512 "workflow signal run is not implemented",
513 ))
514 }
515
516 async fn signal_or_start_run(
518 &self,
519 _request: SignalOrStartWorkflowProviderRunRequest,
520 ) -> ProviderResult<SignalWorkflowRunResponse> {
521 Err(crate::Error::unimplemented(
522 "workflow signal or start run is not implemented",
523 ))
524 }
525
526 async fn deliver_event(
528 &self,
529 _request: DeliverWorkflowProviderEventRequest,
530 ) -> ProviderResult<WorkflowEvent> {
531 Err(crate::Error::unimplemented(
532 "workflow deliver event is not implemented",
533 ))
534 }
535}
536
537#[derive(Clone)]
538pub(crate) struct WorkflowServer<P> {
539 provider: Arc<P>,
540}
541
542impl<P> WorkflowServer<P> {
543 pub(crate) fn new(provider: Arc<P>) -> Self {
544 Self { provider }
545 }
546}
547
548#[async_trait]
549impl<P> pb::workflow_server::Workflow for WorkflowServer<P>
550where
551 P: WorkflowProvider,
552{
553 async fn apply_definition(
554 &self,
555 request: GrpcRequest<pb::ApplyWorkflowProviderDefinitionRequest>,
556 ) -> std::result::Result<GrpcResponse<pb::WorkflowDefinition>, Status> {
557 let definition = self
558 .provider
559 .apply_definition(request.into_inner())
560 .await
561 .map_err(|error| rpc_status("workflow apply definition", error))?;
562 Ok(GrpcResponse::new(definition))
563 }
564
565 async fn get_definition(
566 &self,
567 request: GrpcRequest<pb::GetWorkflowProviderDefinitionRequest>,
568 ) -> std::result::Result<GrpcResponse<pb::WorkflowDefinition>, Status> {
569 let definition = self
570 .provider
571 .get_definition(request.into_inner())
572 .await
573 .map_err(|error| rpc_status("workflow get definition", error))?;
574 Ok(GrpcResponse::new(definition))
575 }
576
577 async fn list_definitions(
578 &self,
579 request: GrpcRequest<pb::ListWorkflowProviderDefinitionsRequest>,
580 ) -> std::result::Result<GrpcResponse<pb::ListWorkflowProviderDefinitionsResponse>, Status>
581 {
582 let response = self
583 .provider
584 .list_definitions(request.into_inner())
585 .await
586 .map_err(|error| rpc_status("workflow list definitions", error))?;
587 Ok(GrpcResponse::new(response))
588 }
589
590 async fn set_definition_paused(
591 &self,
592 request: GrpcRequest<pb::SetWorkflowProviderDefinitionPausedRequest>,
593 ) -> std::result::Result<GrpcResponse<pb::WorkflowDefinition>, Status> {
594 let definition = self
595 .provider
596 .set_definition_paused(request.into_inner())
597 .await
598 .map_err(|error| rpc_status("workflow set definition paused", error))?;
599 Ok(GrpcResponse::new(definition))
600 }
601
602 async fn set_activation_paused(
603 &self,
604 request: GrpcRequest<pb::SetWorkflowProviderActivationPausedRequest>,
605 ) -> std::result::Result<GrpcResponse<pb::WorkflowDefinition>, Status> {
606 let definition = self
607 .provider
608 .set_activation_paused(request.into_inner())
609 .await
610 .map_err(|error| rpc_status("workflow set activation paused", error))?;
611 Ok(GrpcResponse::new(definition))
612 }
613
614 async fn delete_definition(
615 &self,
616 request: GrpcRequest<pb::DeleteWorkflowProviderDefinitionRequest>,
617 ) -> std::result::Result<GrpcResponse<()>, Status> {
618 self.provider
619 .delete_definition(request.into_inner())
620 .await
621 .map_err(|error| rpc_status("workflow delete definition", error))?;
622 Ok(GrpcResponse::new(()))
623 }
624
625 async fn start_run(
626 &self,
627 request: GrpcRequest<pb::StartWorkflowProviderRunRequest>,
628 ) -> std::result::Result<GrpcResponse<pb::WorkflowRun>, Status> {
629 let run = self
630 .provider
631 .start_run(request.into_inner())
632 .await
633 .map_err(|error| rpc_status("workflow start run", error))?;
634 Ok(GrpcResponse::new(run))
635 }
636
637 async fn list_runs(
638 &self,
639 request: GrpcRequest<pb::ListWorkflowProviderRunsRequest>,
640 ) -> std::result::Result<GrpcResponse<pb::ListWorkflowProviderRunsResponse>, Status> {
641 let response = self
642 .provider
643 .list_runs(request.into_inner())
644 .await
645 .map_err(|error| rpc_status("workflow list runs", error))?;
646 Ok(GrpcResponse::new(response))
647 }
648
649 async fn get_run(
650 &self,
651 request: GrpcRequest<pb::GetWorkflowProviderRunRequest>,
652 ) -> std::result::Result<GrpcResponse<pb::WorkflowRun>, Status> {
653 let run = self
654 .provider
655 .get_run(request.into_inner())
656 .await
657 .map_err(|error| rpc_status("workflow get run", error))?;
658 Ok(GrpcResponse::new(run))
659 }
660
661 async fn get_run_events(
662 &self,
663 request: GrpcRequest<pb::GetWorkflowProviderRunEventsRequest>,
664 ) -> std::result::Result<GrpcResponse<pb::GetWorkflowProviderRunEventsResponse>, Status> {
665 let response = self
666 .provider
667 .get_run_events(request.into_inner())
668 .await
669 .map_err(|error| rpc_status("workflow get run events", error))?;
670 Ok(GrpcResponse::new(response))
671 }
672
673 async fn get_run_output(
674 &self,
675 request: GrpcRequest<pb::GetWorkflowProviderRunOutputRequest>,
676 ) -> std::result::Result<GrpcResponse<pb::GetWorkflowProviderRunOutputResponse>, Status> {
677 let response = self
678 .provider
679 .get_run_output(request.into_inner())
680 .await
681 .map_err(|error| rpc_status("workflow get run output", error))?;
682 Ok(GrpcResponse::new(response))
683 }
684
685 async fn cancel_run(
686 &self,
687 request: GrpcRequest<pb::CancelWorkflowProviderRunRequest>,
688 ) -> std::result::Result<GrpcResponse<pb::WorkflowRun>, Status> {
689 let run = self
690 .provider
691 .cancel_run(request.into_inner())
692 .await
693 .map_err(|error| rpc_status("workflow cancel run", error))?;
694 Ok(GrpcResponse::new(run))
695 }
696
697 async fn signal_run(
698 &self,
699 request: GrpcRequest<pb::SignalWorkflowProviderRunRequest>,
700 ) -> std::result::Result<GrpcResponse<pb::SignalWorkflowRunResponse>, Status> {
701 let response = self
702 .provider
703 .signal_run(request.into_inner())
704 .await
705 .map_err(|error| rpc_status("workflow signal run", error))?;
706 Ok(GrpcResponse::new(response))
707 }
708
709 async fn signal_or_start_run(
710 &self,
711 request: GrpcRequest<pb::SignalOrStartWorkflowProviderRunRequest>,
712 ) -> std::result::Result<GrpcResponse<pb::SignalWorkflowRunResponse>, Status> {
713 let response = self
714 .provider
715 .signal_or_start_run(request.into_inner())
716 .await
717 .map_err(|error| rpc_status("workflow signal or start run", error))?;
718 Ok(GrpcResponse::new(response))
719 }
720
721 async fn deliver_event(
722 &self,
723 request: GrpcRequest<pb::DeliverWorkflowProviderEventRequest>,
724 ) -> std::result::Result<GrpcResponse<pb::WorkflowEvent>, Status> {
725 let event = self
726 .provider
727 .deliver_event(request.into_inner())
728 .await
729 .map_err(|error| rpc_status("workflow deliver event", error))?;
730 Ok(GrpcResponse::new(event))
731 }
732}
733
734#[derive(Clone, Debug, Default, PartialEq)]
735pub struct WorkflowExecutionRequest {
737 pub provider_name: String,
739 pub run_id: String,
741 pub target: Option<BoundWorkflowTarget>,
743 pub trigger: Option<WorkflowRunTrigger>,
745 pub input: Option<Value>,
747 pub metadata: Option<Value>,
749 pub signals: Vec<WorkflowSignal>,
751}
752
753#[derive(Clone, Debug, Default, PartialEq)]
754pub struct WorkflowEvalContext {
756 pub request: WorkflowExecutionRequest,
758 pub outputs: BTreeMap<String, Value>,
760 pub inputs: BTreeMap<String, Value>,
762 pub allow_inputs: bool,
764}
765
766#[derive(Clone, Debug, PartialEq)]
767pub struct WorkflowEvalResult {
769 pub value: Option<Value>,
771 pub resolved: bool,
773}
774
775pub fn evaluate_workflow_value(
777 ctx: &WorkflowEvalContext,
778 value: &WorkflowValue,
779) -> Result<WorkflowEvalResult> {
780 match value.kind.as_ref() {
781 None => Ok(WorkflowEvalResult {
782 value: None,
783 resolved: true,
784 }),
785 Some(workflow_value::Kind::Literal(value)) => Ok(WorkflowEvalResult {
786 value: Some(protocol::json_from_value(value)),
787 resolved: true,
788 }),
789 Some(workflow_value::Kind::Object(object)) => {
790 let mut out = serde_json::Map::new();
791 for (key, nested) in &object.fields {
792 let resolved = evaluate_workflow_value(ctx, nested)?;
793 if !resolved.resolved {
794 return Ok(WorkflowEvalResult {
795 value: None,
796 resolved: false,
797 });
798 }
799 out.insert(key.clone(), resolved.value.unwrap_or(Value::Null));
800 }
801 Ok(WorkflowEvalResult {
802 value: Some(Value::Object(out)),
803 resolved: true,
804 })
805 }
806 Some(workflow_value::Kind::Array(array)) => {
807 let mut out = Vec::with_capacity(array.values.len());
808 for nested in &array.values {
809 let resolved = evaluate_workflow_value(ctx, nested)?;
810 if !resolved.resolved {
811 return Ok(WorkflowEvalResult {
812 value: None,
813 resolved: false,
814 });
815 }
816 out.push(resolved.value.unwrap_or(Value::Null));
817 }
818 Ok(WorkflowEvalResult {
819 value: Some(Value::Array(out)),
820 resolved: true,
821 })
822 }
823 Some(workflow_value::Kind::Template(text)) => Ok(WorkflowEvalResult {
824 value: Some(Value::String(render_workflow_template(
825 ctx,
826 &text.template,
827 )?)),
828 resolved: true,
829 }),
830 Some(workflow_value::Kind::Input(source)) => {
831 path_value_option(ctx.request.input.as_ref(), &source.path)
832 }
833 Some(workflow_value::Kind::Signal(source)) => latest_signal_payload(ctx)
834 .map(|payload| path_value(&payload, &source.path))
835 .unwrap_or_else(|| {
836 Ok(WorkflowEvalResult {
837 value: None,
838 resolved: false,
839 })
840 }),
841 Some(workflow_value::Kind::StepOutput(source)) => {
842 match ctx.outputs.get(source.step_id.trim()) {
843 Some(output) => path_value(output, &source.path),
844 None => Err(Error::bad_request(format!(
845 "workflow step output references missing step {:?}",
846 source.step_id.trim()
847 ))),
848 }
849 }
850 Some(workflow_value::Kind::StepInput(source)) => {
851 if !ctx.allow_inputs {
852 return Err(Error::bad_request(
853 "step input references are not allowed here",
854 ));
855 }
856 match ctx.inputs.get(source.step_id.trim()) {
857 Some(input) => path_value(input, &source.path),
858 None => Ok(WorkflowEvalResult {
859 value: None,
860 resolved: false,
861 }),
862 }
863 }
864 }
865}
866
867pub fn render_workflow_template(ctx: &WorkflowEvalContext, template: &str) -> Result<String> {
869 let mut out = String::new();
870 let mut i = 0;
871 while i < template.len() {
872 let remaining = &template[i..];
873 if remaining.starts_with("$${{") {
874 out.push_str("${{");
875 i += 4;
876 continue;
877 }
878 if !remaining.starts_with("${{") {
879 let ch = remaining.chars().next().expect("non-empty string");
880 out.push(ch);
881 i += ch.len_utf8();
882 continue;
883 }
884 let end = remaining[3..]
885 .find("}}")
886 .ok_or_else(|| Error::bad_request("unterminated template expression"))?;
887 let expr = remaining[3..3 + end].trim();
888 let resolved = template_expression_value(ctx, expr)?;
889 if !resolved.resolved {
890 return Err(Error::bad_request(format!(
891 "template expression {expr:?} did not resolve"
892 )));
893 }
894 out.push_str(&render_template_value(
895 resolved.value.unwrap_or(Value::Null),
896 )?);
897 i += 3 + end + 2;
898 }
899 Ok(out)
900}
901
902pub fn latest_workflow_signal(signals: &[WorkflowSignal]) -> Option<&WorkflowSignal> {
904 signals.last()
905}
906
907pub fn path_value(root: &Value, path: &str) -> Result<WorkflowEvalResult> {
909 if path.trim().is_empty() {
910 return Ok(WorkflowEvalResult {
911 value: Some(root.clone()),
912 resolved: true,
913 });
914 }
915 let mut current = root;
916 for segment in path_segments(path)? {
917 match (current, segment) {
918 (Value::Object(map), PathSegment::Key(key)) => match map.get(&key) {
919 Some(next) => current = next,
920 None => {
921 return Ok(WorkflowEvalResult {
922 value: None,
923 resolved: false,
924 });
925 }
926 },
927 (Value::Array(values), PathSegment::Index(index)) if index < values.len() => {
928 current = &values[index];
929 }
930 _ => {
931 return Ok(WorkflowEvalResult {
932 value: None,
933 resolved: false,
934 });
935 }
936 }
937 }
938 Ok(WorkflowEvalResult {
939 value: Some(current.clone()),
940 resolved: true,
941 })
942}
943
944fn path_value_option(root: Option<&Value>, path: &str) -> Result<WorkflowEvalResult> {
945 match root {
946 Some(root) => path_value(root, path),
947 None => Ok(WorkflowEvalResult {
948 value: None,
949 resolved: false,
950 }),
951 }
952}
953
954fn template_expression_value(ctx: &WorkflowEvalContext, expr: &str) -> Result<WorkflowEvalResult> {
955 if expr == "input" {
956 return path_value_option(ctx.request.input.as_ref(), "");
957 }
958 if let Some(path) = expr.strip_prefix("input.") {
959 return path_value_option(ctx.request.input.as_ref(), path);
960 }
961 if expr == "signal" {
962 return latest_signal_payload(ctx)
963 .map(|payload| path_value(&payload, ""))
964 .unwrap_or_else(|| {
965 Ok(WorkflowEvalResult {
966 value: None,
967 resolved: false,
968 })
969 });
970 }
971 if let Some(path) = expr.strip_prefix("signal.") {
972 return latest_signal_payload(ctx)
973 .map(|payload| path_value(&payload, path))
974 .unwrap_or_else(|| {
975 Ok(WorkflowEvalResult {
976 value: None,
977 resolved: false,
978 })
979 });
980 }
981 if let Some(path) = expr.strip_prefix("steps.") {
982 return step_expression_value(ctx, path);
983 }
984 Err(Error::bad_request(format!(
985 "unsupported template expression {expr:?}"
986 )))
987}
988
989fn step_expression_value(ctx: &WorkflowEvalContext, path: &str) -> Result<WorkflowEvalResult> {
990 let Some((step_id, rest)) = path.split_once('.') else {
991 return Err(Error::bad_request(format!(
992 "unsupported step template expression {path:?}"
993 )));
994 };
995 if let Some(path) = rest.strip_prefix("outputs") {
996 let path = path.strip_prefix('.').unwrap_or("");
997 return match ctx.outputs.get(step_id.trim()) {
998 Some(output) => path_value(output, path),
999 None => Err(Error::bad_request(format!(
1000 "workflow step output references missing step {:?}",
1001 step_id.trim()
1002 ))),
1003 };
1004 }
1005 if let Some(path) = rest.strip_prefix("inputs") {
1006 if !ctx.allow_inputs {
1007 return Err(Error::bad_request(
1008 "step input references are not allowed here",
1009 ));
1010 }
1011 let path = path.strip_prefix('.').unwrap_or("");
1012 return match ctx.inputs.get(step_id.trim()) {
1013 Some(input) => path_value(input, path),
1014 None => Ok(WorkflowEvalResult {
1015 value: None,
1016 resolved: false,
1017 }),
1018 };
1019 }
1020 Err(Error::bad_request(format!(
1021 "unsupported step template expression {path:?}"
1022 )))
1023}
1024
1025fn latest_signal_payload(ctx: &WorkflowEvalContext) -> Option<Value> {
1026 latest_workflow_signal(&ctx.request.signals)
1027 .and_then(|signal| signal.payload.as_ref())
1028 .map(protocol::json_from_struct)
1029}
1030
1031fn render_template_value(value: Value) -> Result<String> {
1032 match value {
1033 Value::String(value) => Ok(value),
1034 other => Ok(serde_json::to_string(&other)?),
1035 }
1036}
1037
1038#[derive(Debug)]
1039enum PathSegment {
1040 Key(String),
1041 Index(usize),
1042}
1043
1044fn path_segments(path: &str) -> Result<Vec<PathSegment>> {
1045 let chars: Vec<char> = path.trim().chars().collect();
1046 let mut out = Vec::new();
1047 let mut i = 0;
1048 while i < chars.len() {
1049 match chars[i] {
1050 '.' => i += 1,
1051 '[' => {
1052 let start = i + 1;
1053 let mut end = start;
1054 while end < chars.len() && chars[end] != ']' {
1055 end += 1;
1056 }
1057 if end >= chars.len() {
1058 return Err(Error::bad_request(format!(
1059 "invalid workflow path {path:?}"
1060 )));
1061 }
1062 let token: String = chars[start..end].iter().collect();
1063 let token = token.trim();
1064 if token.starts_with('"') || token.starts_with('\'') {
1065 out.push(PathSegment::Key(unquote_path_key(token, path)?));
1066 } else {
1067 out.push(PathSegment::Index(token.parse().map_err(|_| {
1068 Error::bad_request(format!("invalid workflow path {path:?}"))
1069 })?));
1070 }
1071 i = end + 1;
1072 }
1073 _ => {
1074 let start = i;
1075 while i < chars.len() && chars[i] != '.' && chars[i] != '[' {
1076 i += 1;
1077 }
1078 let key: String = chars[start..i].iter().collect::<String>().trim().to_owned();
1079 if key.is_empty() {
1080 return Err(Error::bad_request(format!(
1081 "invalid workflow path {path:?}"
1082 )));
1083 }
1084 out.push(PathSegment::Key(key));
1085 }
1086 }
1087 }
1088 Ok(out)
1089}
1090
1091fn unquote_path_key(token: &str, path: &str) -> Result<String> {
1092 if token.starts_with('"') {
1093 return serde_json::from_str(token)
1094 .map_err(|_| Error::bad_request(format!("invalid workflow path {path:?}")));
1095 }
1096 if token.len() < 2 || !token.ends_with('\'') {
1097 return Err(Error::bad_request(format!(
1098 "invalid workflow path {path:?}"
1099 )));
1100 }
1101 let mut out = String::new();
1102 let mut chars = token[1..token.len() - 1].chars();
1103 while let Some(ch) = chars.next() {
1104 if ch != '\\' {
1105 out.push(ch);
1106 continue;
1107 }
1108 let escaped = chars
1109 .next()
1110 .ok_or_else(|| Error::bad_request(format!("invalid workflow path {path:?}")))?;
1111 match escaped {
1112 '\'' | '"' | '\\' => out.push(escaped),
1113 'n' => out.push('\n'),
1114 'r' => out.push('\r'),
1115 't' => out.push('\t'),
1116 'u' => {
1117 let mut hex = String::new();
1118 for _ in 0..4 {
1119 hex.push(chars.next().ok_or_else(|| {
1120 Error::bad_request(format!("invalid workflow path {path:?}"))
1121 })?);
1122 }
1123 let code = u32::from_str_radix(&hex, 16)
1124 .map_err(|_| Error::bad_request(format!("invalid workflow path {path:?}")))?;
1125 let value = char::from_u32(code)
1126 .ok_or_else(|| Error::bad_request(format!("invalid workflow path {path:?}")))?;
1127 out.push(value);
1128 }
1129 other => out.push(other),
1130 }
1131 }
1132 Ok(out)
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137 use serde_json::json;
1138
1139 use crate::protocol;
1140 use crate::workflow_provider::{
1141 WorkflowEvalContext, WorkflowExecutionRequest, WorkflowSignal, evaluate_workflow_value,
1142 path_value, render_workflow_template, workflow_value_input,
1143 };
1144
1145 #[test]
1146 fn evaluates_current_templates_and_paths() {
1147 let ctx = WorkflowEvalContext {
1148 request: WorkflowExecutionRequest {
1149 provider_name: "indexeddb".to_owned(),
1150 run_id: "run-1".to_owned(),
1151 input: Some(json!({"customer": {"id": "cust_1"}})),
1152 signals: vec![WorkflowSignal {
1153 id: "sig-1".to_owned(),
1154 payload: Some(
1155 protocol::struct_from_json(json!({"thread": {"ts": "123.456"}})).unwrap(),
1156 ),
1157 ..Default::default()
1158 }],
1159 ..Default::default()
1160 },
1161 inputs: [("draft".to_owned(), json!({"title": "PR"}))].into(),
1162 outputs: [("diagnosis".to_owned(), json!({"action": "open_pr"}))].into(),
1163 allow_inputs: true,
1164 };
1165
1166 assert_eq!(
1167 render_workflow_template(
1168 &ctx,
1169 "customer=${{ input.customer.id }}; thread=${{ signal.thread.ts }}; action=${{ steps.diagnosis.outputs.action }}; title=${{ steps.draft.inputs.title }}; literal=$${{x}}",
1170 )
1171 .unwrap(),
1172 "customer=cust_1; thread=123.456; action=open_pr; title=PR; literal=${{x}}"
1173 );
1174 assert_eq!(
1175 evaluate_workflow_value(&ctx, &workflow_value_input("customer.id"))
1176 .unwrap()
1177 .value,
1178 Some(json!("cust_1"))
1179 );
1180 assert_eq!(
1181 path_value(
1182 &json!({"quote'key": {"value": 42}}),
1183 "['quote\\'key'].value"
1184 )
1185 .unwrap()
1186 .value,
1187 Some(json!(42))
1188 );
1189 }
1190}