1use serde::de::DeserializeOwned;
2use serde::{Deserialize, Serialize};
3use serde_json::{Value, json};
4use strum::IntoEnumIterator;
5
6use crate::store::WorkGraphEventFilter;
7use crate::types::{
8 AddEvidenceRequest, ClaimWorkItemRequest, CloseWorkItemRequest, LinkWorkItemsRequest,
9 ReadyWorkFilter, ReleaseWorkItemRequest, UpdateWorkItemRequest, WorkGraphSnapshotFilter,
10 WorkItemFilter, WorkItemId, WorkNamespace,
11};
12use crate::{CreateWorkItemRequest, WorkGraphError, WorkGraphService};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22#[serde(rename_all = "snake_case")]
23pub enum WorkGraphToolErrorCode {
24 InvalidArguments,
25 NotFound,
26 CapabilityUnavailable,
27 Conflict,
28 InvalidTransition,
29 StoreError,
30 InternalError,
31}
32
33impl WorkGraphToolErrorCode {
34 pub const fn as_str(self) -> &'static str {
38 match self {
39 Self::InvalidArguments => "invalid_arguments",
40 Self::NotFound => "not_found",
41 Self::CapabilityUnavailable => "capability_unavailable",
42 Self::Conflict => "conflict",
43 Self::InvalidTransition => "invalid_transition",
44 Self::StoreError => "store_error",
45 Self::InternalError => "internal_error",
46 }
47 }
48}
49
50impl std::fmt::Display for WorkGraphToolErrorCode {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 f.write_str(self.as_str())
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
57pub struct WorkGraphToolError {
58 pub code: WorkGraphToolErrorCode,
59 pub message: String,
60}
61
62impl WorkGraphToolError {
63 fn new(code: WorkGraphToolErrorCode, message: impl Into<String>) -> Self {
64 Self {
65 code,
66 message: message.into(),
67 }
68 }
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, strum::EnumIter)]
80enum WorkGraphToolContract {
81 Create,
82 Get,
83 List,
84 Ready,
85 Snapshot,
86 Events,
87 Claim,
88 Release,
89 Update,
90 Block,
91 Close,
92 Link,
93 AddEvidence,
94}
95
96impl WorkGraphToolContract {
97 const fn name(self) -> &'static str {
98 match self {
99 Self::Create => "workgraph_create",
100 Self::Get => "workgraph_get",
101 Self::List => "workgraph_list",
102 Self::Ready => "workgraph_ready",
103 Self::Snapshot => "workgraph_snapshot",
104 Self::Events => "workgraph_events",
105 Self::Claim => "workgraph_claim",
106 Self::Release => "workgraph_release",
107 Self::Update => "workgraph_update",
108 Self::Block => "workgraph_block",
109 Self::Close => "workgraph_close",
110 Self::Link => "workgraph_link",
111 Self::AddEvidence => "workgraph_add_evidence",
112 }
113 }
114
115 const fn description(self) -> &'static str {
116 match self {
117 Self::Create => "Create a durable WorkGraph item.",
118 Self::Get => "Read one WorkGraph item.",
119 Self::List => "List WorkGraph items.",
120 Self::Ready => "List ready, claimable WorkGraph items.",
121 Self::Snapshot => "Read a WorkGraph observability snapshot.",
122 Self::Events => "Read WorkGraph event history.",
123 Self::Claim => "Claim a ready WorkGraph item with CAS revision checking.",
124 Self::Release => "Release a claimed WorkGraph item.",
125 Self::Update => "Update non-terminal WorkGraph item fields.",
126 Self::Block => "Mark a WorkGraph item blocked.",
127 Self::Close => "Close a WorkGraph item with a terminal status.",
128 Self::Link => "Create a dependency or relationship edge.",
129 Self::AddEvidence => "Attach a typed evidence reference to a WorkGraph item.",
130 }
131 }
132
133 fn schema(self) -> Value {
134 match self {
135 Self::Create => create_schema(),
136 Self::Get => id_schema(false),
137 Self::List => list_schema(),
138 Self::Ready => ready_schema(),
139 Self::Snapshot => snapshot_schema(),
140 Self::Events => events_schema(),
141 Self::Claim => claim_schema(),
142 Self::Release | Self::Block => revision_id_schema(),
143 Self::Update => update_schema(),
144 Self::Close => close_schema(),
145 Self::Link => link_schema(),
146 Self::AddEvidence => evidence_schema(),
147 }
148 }
149
150 fn parse(name: &str) -> Result<Self, WorkGraphToolError> {
151 Self::iter()
152 .find(|contract| contract.name() == name)
153 .ok_or_else(|| {
154 WorkGraphToolError::new(
155 WorkGraphToolErrorCode::NotFound,
156 format!("unknown WorkGraph tool '{name}'"),
157 )
158 })
159 }
160}
161
162pub fn workgraph_tools_list() -> Vec<Value> {
163 WorkGraphToolContract::iter()
164 .map(|contract| tool(contract.name(), contract.description(), contract.schema()))
165 .collect()
166}
167
168pub async fn handle_workgraph_tools_call(
169 service: &WorkGraphService,
170 name: &str,
171 arguments: &Value,
172) -> Result<Value, WorkGraphToolError> {
173 match WorkGraphToolContract::parse(name)? {
174 WorkGraphToolContract::Create => {
175 let request: CreateWorkItemRequest = parse(arguments)?;
176 service
177 .create(request)
178 .await
179 .map(|item| json!({ "item": item }))
180 .map_err(map_error)
181 }
182 WorkGraphToolContract::Get => {
183 let request: IdParams = parse(arguments)?;
184 service
185 .get(request.realm_id, request.namespace, request.id)
186 .await
187 .map(|item| json!({ "item": item }))
188 .map_err(map_error)
189 }
190 WorkGraphToolContract::List => {
191 let filter: WorkItemFilter = parse(arguments)?;
192 service
193 .list(filter)
194 .await
195 .map(|items| json!({ "items": items }))
196 .map_err(map_error)
197 }
198 WorkGraphToolContract::Ready => {
199 let filter: ReadyWorkFilter = parse(arguments)?;
200 service
201 .ready(filter)
202 .await
203 .map(|items| json!({ "items": items }))
204 .map_err(map_error)
205 }
206 WorkGraphToolContract::Snapshot => {
207 let filter: WorkGraphSnapshotFilter = parse(arguments)?;
208 service
209 .snapshot(filter)
210 .await
211 .map(|snapshot| json!({ "snapshot": snapshot }))
212 .map_err(map_error)
213 }
214 WorkGraphToolContract::Claim => {
215 let request: ClaimWorkItemRequest = parse(arguments)?;
216 service
217 .claim(request)
218 .await
219 .map(|item| json!({ "item": item }))
220 .map_err(map_error)
221 }
222 WorkGraphToolContract::Release => {
223 let request: ReleaseWorkItemRequest = parse(arguments)?;
224 service
225 .release(request)
226 .await
227 .map(|item| json!({ "item": item }))
228 .map_err(map_error)
229 }
230 WorkGraphToolContract::Update => {
231 let request: UpdateWorkItemRequest = parse(arguments)?;
232 service
233 .update(request)
234 .await
235 .map(|item| json!({ "item": item }))
236 .map_err(map_error)
237 }
238 WorkGraphToolContract::Block => {
239 let request: RevisionIdParams = parse(arguments)?;
240 service
241 .block(
242 request.realm_id,
243 request.namespace,
244 request.id,
245 request.expected_revision,
246 )
247 .await
248 .map(|item| json!({ "item": item }))
249 .map_err(map_error)
250 }
251 WorkGraphToolContract::Close => {
252 let request: CloseWorkItemRequest = parse(arguments)?;
253 service
254 .close(request)
255 .await
256 .map(|item| json!({ "item": item }))
257 .map_err(map_error)
258 }
259 WorkGraphToolContract::Link => {
260 let request: LinkWorkItemsRequest = parse(arguments)?;
261 service
262 .link(request)
263 .await
264 .map(|edge| json!({ "edge": edge }))
265 .map_err(map_error)
266 }
267 WorkGraphToolContract::AddEvidence => {
268 let request: AddEvidenceRequest = parse(arguments)?;
269 service
270 .add_evidence(request)
271 .await
272 .map(|item| json!({ "item": item }))
273 .map_err(map_error)
274 }
275 WorkGraphToolContract::Events => {
276 let filter: WorkGraphEventFilterParams = parse(arguments)?;
277 service
278 .events(filter.into())
279 .await
280 .map(|events| json!({ "events": events }))
281 .map_err(map_error)
282 }
283 }
284}
285
286#[derive(Debug, Deserialize)]
287struct IdParams {
288 id: WorkItemId,
289 #[serde(default)]
290 realm_id: Option<String>,
291 #[serde(default)]
292 namespace: Option<WorkNamespace>,
293}
294
295#[derive(Debug, Deserialize)]
296struct RevisionIdParams {
297 id: WorkItemId,
298 expected_revision: u64,
299 #[serde(default)]
300 realm_id: Option<String>,
301 #[serde(default)]
302 namespace: Option<WorkNamespace>,
303}
304
305#[derive(Debug, Deserialize)]
306struct WorkGraphEventFilterParams {
307 #[serde(default)]
308 realm_id: Option<String>,
309 #[serde(default)]
310 namespace: Option<WorkNamespace>,
311 #[serde(default)]
312 all_namespaces: bool,
313 #[serde(default)]
314 after_seq: Option<i64>,
315 #[serde(default)]
316 limit: Option<usize>,
317}
318
319impl From<WorkGraphEventFilterParams> for WorkGraphEventFilter {
320 fn from(value: WorkGraphEventFilterParams) -> Self {
321 Self {
322 realm_id: value.realm_id,
323 namespace: value.namespace,
324 all_namespaces: value.all_namespaces,
325 after_seq: value.after_seq,
326 limit: value.limit,
327 }
328 }
329}
330
331fn parse<T: DeserializeOwned>(arguments: &Value) -> Result<T, WorkGraphToolError> {
332 serde_json::from_value(arguments.clone()).map_err(|err| {
333 WorkGraphToolError::new(
334 WorkGraphToolErrorCode::InvalidArguments,
335 format!("invalid WorkGraph arguments: {err}"),
336 )
337 })
338}
339
340fn map_error(error: WorkGraphError) -> WorkGraphToolError {
341 let code = match error {
342 WorkGraphError::NotFound { .. } | WorkGraphError::AttentionNotFound { .. } => {
343 WorkGraphToolErrorCode::NotFound
344 }
345 WorkGraphError::StaleRevision { .. } | WorkGraphError::Conflict(_) => {
346 WorkGraphToolErrorCode::Conflict
347 }
348 WorkGraphError::InvalidTransition(_) => WorkGraphToolErrorCode::InvalidTransition,
349 WorkGraphError::InvalidInput(_) | WorkGraphError::InvalidTimestampMillis { .. } => {
350 WorkGraphToolErrorCode::InvalidArguments
351 }
352 WorkGraphError::UnsupportedBackend(_) => WorkGraphToolErrorCode::CapabilityUnavailable,
353 WorkGraphError::Store(_) => WorkGraphToolErrorCode::StoreError,
354 };
355 WorkGraphToolError::new(code, error.to_string())
356}
357
358fn tool(name: &str, description: &str, schema: Value) -> Value {
359 json!({
360 "name": name,
361 "description": description,
362 "inputSchema": schema,
363 })
364}
365
366fn base_properties() -> serde_json::Map<String, Value> {
367 serde_json::Map::from_iter([
368 ("realm_id".to_string(), json!({ "type": "string" })),
369 ("namespace".to_string(), json!({ "type": "string" })),
370 ])
371}
372
373fn external_ref_schema() -> Value {
374 json!({
375 "type": "object",
376 "properties": {
377 "kind": { "type": "string" },
378 "id": { "type": "string" },
379 "url": { "type": "string" }
380 },
381 "required": ["kind", "id"],
382 "additionalProperties": false
383 })
384}
385
386fn evidence_ref_schema() -> Value {
387 json!({
388 "type": "object",
389 "properties": {
390 "kind": { "type": "string" },
391 "id": { "type": "string" },
392 "label": { "type": "string" },
393 "summary": { "type": "string" }
394 },
395 "required": ["kind", "id"],
396 "additionalProperties": false
397 })
398}
399
400fn object(properties: serde_json::Map<String, Value>, required: &[&str]) -> Value {
401 json!({
402 "type": "object",
403 "properties": properties,
404 "required": required,
405 "additionalProperties": false,
406 })
407}
408
409fn id_schema(include_revision: bool) -> Value {
410 let mut properties = base_properties();
411 properties.insert("id".to_string(), json!({ "type": "string" }));
412 if include_revision {
413 properties.insert(
414 "expected_revision".to_string(),
415 json!({ "type": "integer", "minimum": 0 }),
416 );
417 object(properties, &["id", "expected_revision"])
418 } else {
419 object(properties, &["id"])
420 }
421}
422
423fn revision_id_schema() -> Value {
424 id_schema(true)
425}
426
427fn create_schema() -> Value {
428 let mut properties = base_properties();
429 properties.extend([
430 ("title".to_string(), json!({ "type": "string" })),
431 ("description".to_string(), json!({ "type": "string" })),
432 (
433 "priority".to_string(),
434 json!({ "type": "string", "enum": ["low", "medium", "high"] }),
435 ),
436 (
437 "labels".to_string(),
438 json!({ "type": "array", "items": { "type": "string" } }),
439 ),
440 (
441 "due_at".to_string(),
442 json!({ "type": "string", "format": "date-time" }),
443 ),
444 (
445 "not_before".to_string(),
446 json!({ "type": "string", "format": "date-time" }),
447 ),
448 (
449 "snoozed_until".to_string(),
450 json!({ "type": "string", "format": "date-time" }),
451 ),
452 (
453 "status".to_string(),
454 json!({ "type": "string", "enum": ["open", "blocked"] }),
455 ),
456 (
457 "external_refs".to_string(),
458 json!({ "type": "array", "items": external_ref_schema() }),
459 ),
460 (
461 "evidence_refs".to_string(),
462 json!({ "type": "array", "items": evidence_ref_schema() }),
463 ),
464 ]);
465 object(properties, &["title"])
466}
467
468fn list_schema() -> Value {
469 let mut properties = base_properties();
470 properties.extend([
471 ("all_namespaces".to_string(), json!({ "type": "boolean" })),
472 (
473 "statuses".to_string(),
474 json!({ "type": "array", "items": { "type": "string" } }),
475 ),
476 (
477 "labels".to_string(),
478 json!({ "type": "array", "items": { "type": "string" } }),
479 ),
480 ("include_terminal".to_string(), json!({ "type": "boolean" })),
481 (
482 "limit".to_string(),
483 json!({ "type": "integer", "minimum": 1 }),
484 ),
485 ]);
486 object(properties, &[])
487}
488
489fn ready_schema() -> Value {
490 let mut properties = base_properties();
491 properties.extend([
492 (
493 "labels".to_string(),
494 json!({ "type": "array", "items": { "type": "string" } }),
495 ),
496 (
497 "limit".to_string(),
498 json!({ "type": "integer", "minimum": 1 }),
499 ),
500 ]);
501 object(properties, &[])
502}
503
504fn snapshot_schema() -> Value {
505 list_schema()
506}
507
508fn events_schema() -> Value {
509 let mut properties = base_properties();
510 properties.extend([
511 ("all_namespaces".to_string(), json!({ "type": "boolean" })),
512 (
513 "after_seq".to_string(),
514 json!({ "type": "integer", "minimum": 0 }),
515 ),
516 (
517 "limit".to_string(),
518 json!({ "type": "integer", "minimum": 1 }),
519 ),
520 ]);
521 object(properties, &[])
522}
523
524fn claim_schema() -> Value {
525 let mut properties = base_properties();
526 properties.extend([
527 ("id".to_string(), json!({ "type": "string" })),
528 (
529 "expected_revision".to_string(),
530 json!({ "type": "integer", "minimum": 0 }),
531 ),
532 (
533 "owner".to_string(),
534 json!({
535 "type": "object",
536 "properties": {
537 "key": {
538 "type": "object",
539 "properties": {
540 "kind": {
541 "type": "string",
542 "enum": ["principal", "agent", "session", "mob", "label"]
543 },
544 "id": { "type": "string" }
545 },
546 "required": ["kind", "id"],
547 "additionalProperties": false
548 },
549 "display_name": { "type": "string" }
550 },
551 "required": ["key"],
552 "additionalProperties": false
553 }),
554 ),
555 (
556 "lease_seconds".to_string(),
557 json!({ "type": "integer", "minimum": 1 }),
558 ),
559 (
560 "lease_expires_at".to_string(),
561 json!({ "type": "string", "format": "date-time" }),
562 ),
563 ]);
564 object(properties, &["id", "expected_revision", "owner"])
565}
566
567fn update_schema() -> Value {
568 let mut properties = base_properties();
569 properties.extend([
570 ("id".to_string(), json!({ "type": "string" })),
571 (
572 "expected_revision".to_string(),
573 json!({ "type": "integer", "minimum": 0 }),
574 ),
575 ("title".to_string(), json!({ "type": "string" })),
576 ("description".to_string(), json!({ "type": "string" })),
577 (
578 "priority".to_string(),
579 json!({ "type": "string", "enum": ["low", "medium", "high"] }),
580 ),
581 (
582 "labels".to_string(),
583 json!({ "type": "array", "items": { "type": "string" } }),
584 ),
585 (
586 "due_at".to_string(),
587 json!({ "type": "string", "format": "date-time" }),
588 ),
589 (
590 "not_before".to_string(),
591 json!({ "type": "string", "format": "date-time" }),
592 ),
593 (
594 "snoozed_until".to_string(),
595 json!({ "type": "string", "format": "date-time" }),
596 ),
597 (
598 "external_refs".to_string(),
599 json!({ "type": "array", "items": external_ref_schema() }),
600 ),
601 ]);
602 object(properties, &["id", "expected_revision"])
603}
604
605fn close_schema() -> Value {
606 let mut properties = base_properties();
607 properties.extend([
608 ("id".to_string(), json!({ "type": "string" })),
609 (
610 "expected_revision".to_string(),
611 json!({ "type": "integer", "minimum": 0 }),
612 ),
613 (
614 "status".to_string(),
615 json!({ "type": "string", "enum": ["completed", "cancelled", "failed"] }),
616 ),
617 ]);
618 object(properties, &["id", "expected_revision"])
619}
620
621fn link_schema() -> Value {
622 let mut properties = base_properties();
623 properties.extend([
624 (
625 "kind".to_string(),
626 json!({
627 "type": "string",
628 "enum": ["blocks", "parent", "related", "supersedes", "derived_from"]
629 }),
630 ),
631 ("from_id".to_string(), json!({ "type": "string" })),
632 ("to_id".to_string(), json!({ "type": "string" })),
633 ]);
634 object(properties, &["kind", "from_id", "to_id"])
635}
636
637fn evidence_schema() -> Value {
638 let mut properties = base_properties();
639 properties.extend([
640 ("id".to_string(), json!({ "type": "string" })),
641 (
642 "expected_revision".to_string(),
643 json!({ "type": "integer", "minimum": 0 }),
644 ),
645 ("evidence".to_string(), evidence_ref_schema()),
646 ]);
647 object(properties, &["id", "expected_revision", "evidence"])
648}
649
650#[cfg(test)]
651#[allow(clippy::expect_used, clippy::unwrap_used)]
652mod tests {
653 use std::collections::BTreeSet;
654 use std::sync::Arc;
655
656 use serde_json::json;
657
658 use crate::{MemoryWorkGraphStore, WorkGraphService, WorkNamespace};
659
660 use super::*;
661
662 #[tokio::test]
663 async fn workgraph_tools_create_and_ready_round_trip() {
664 let service = WorkGraphService::with_scope(
665 Arc::new(MemoryWorkGraphStore::new()),
666 "realm",
667 WorkNamespace::default(),
668 );
669 let created = handle_workgraph_tools_call(
670 &service,
671 "workgraph_create",
672 &json!({ "title": "tool item", "labels": ["a"] }),
673 )
674 .await
675 .expect("create");
676 let id = created["item"]["id"].as_str().expect("id").to_string();
677 let ready =
678 handle_workgraph_tools_call(&service, "workgraph_ready", &json!({ "labels": ["a"] }))
679 .await
680 .expect("ready");
681 assert_eq!(ready["items"][0]["id"].as_str(), Some(id.as_str()));
682 }
683
684 const CANONICAL_WORKGRAPH_TOOL_NAMES: &[&str] = &[
691 "workgraph_create",
692 "workgraph_get",
693 "workgraph_list",
694 "workgraph_ready",
695 "workgraph_snapshot",
696 "workgraph_events",
697 "workgraph_claim",
698 "workgraph_release",
699 "workgraph_update",
700 "workgraph_block",
701 "workgraph_close",
702 "workgraph_link",
703 "workgraph_add_evidence",
704 ];
705
706 #[test]
707 fn workgraph_tool_catalog_matches_canonical_operation_set_without_drift() {
708 let canonical = CANONICAL_WORKGRAPH_TOOL_NAMES
709 .iter()
710 .copied()
711 .map(ToString::to_string)
712 .collect::<BTreeSet<_>>();
713 assert_eq!(
714 canonical.len(),
715 CANONICAL_WORKGRAPH_TOOL_NAMES.len(),
716 "canonical WorkGraph operation names must be unique"
717 );
718
719 let catalog = WorkGraphToolContract::iter()
722 .map(|contract| contract.name().to_string())
723 .collect::<BTreeSet<_>>();
724 assert_eq!(
725 catalog.len(),
726 WorkGraphToolContract::iter().count(),
727 "WorkGraphToolContract variants must not share operation names"
728 );
729 assert_eq!(
730 catalog, canonical,
731 "derived WorkGraphToolContract catalog drifted from the canonical operation set"
732 );
733
734 let advertised = workgraph_tools_list()
736 .into_iter()
737 .filter_map(|tool| tool["name"].as_str().map(ToString::to_string))
738 .collect::<BTreeSet<_>>();
739 assert_eq!(
740 advertised, canonical,
741 "advertised WorkGraph tool list drifted from the canonical operation set"
742 );
743
744 for name in CANONICAL_WORKGRAPH_TOOL_NAMES {
748 let contract = WorkGraphToolContract::parse(name)
749 .expect("canonical WorkGraph operation must be dispatchable");
750 assert_eq!(
751 contract.name(),
752 *name,
753 "dispatch round-trip changed the operation name for {name}"
754 );
755 }
756 let unknown = WorkGraphToolContract::parse("workgraph_not_a_real_tool")
757 .expect_err("dispatch must reject operations outside the catalog");
758 assert_eq!(unknown.code, WorkGraphToolErrorCode::NotFound);
759 }
760
761 #[test]
762 fn workgraph_tool_schemas_do_not_expose_bare_arrays_or_objects() {
763 fn assert_schema_is_provider_safe(path: &str, schema: &Value) {
764 match schema {
765 Value::Object(map) => {
766 let is_array = map.get("type").and_then(Value::as_str) == Some("array");
767 assert!(
768 !is_array || map.contains_key("items"),
769 "{path} is an array schema without items"
770 );
771
772 let is_object = map.get("type").and_then(Value::as_str) == Some("object");
773 assert!(
774 !is_object || map.contains_key("properties"),
775 "{path} is an object schema without properties"
776 );
777
778 for (key, value) in map {
779 assert_schema_is_provider_safe(&format!("{path}.{key}"), value);
780 }
781 }
782 Value::Array(items) => {
783 for (index, value) in items.iter().enumerate() {
784 assert_schema_is_provider_safe(&format!("{path}[{index}]"), value);
785 }
786 }
787 _ => {}
788 }
789 }
790
791 for tool in workgraph_tools_list() {
792 let name = tool["name"].as_str().expect("tool name");
793 assert_schema_is_provider_safe(name, &tool["inputSchema"]);
794 }
795 }
796}