use crate::EventKind;
use crate::provider_validation::{Event, ExecutionMetadata, create_instance};
use crate::provider_validations::ProviderFactory;
use crate::providers::{PruneOptions, WorkItem};
use std::time::Duration;
fn poke_item(instance: &str) -> WorkItem {
WorkItem::ExternalRaised {
instance: instance.to_string(),
name: "poke".to_string(),
data: "{}".to_string(),
}
}
async fn ack_with_delta(
provider: &dyn crate::providers::Provider,
instance: &str,
execution_id: u64,
history_delta: Vec<Event>,
) {
provider
.enqueue_for_orchestrator(poke_item(instance), None)
.await
.unwrap();
let (_, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item");
provider
.ack_orchestration_item(
&lock_token,
execution_id,
history_delta,
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
pub async fn test_kv_set_and_get<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-set").await.unwrap();
ack_with_delta(
&*provider,
"kv-set",
1,
vec![Event::with_event_id(
100,
"kv-set",
1,
None,
EventKind::KeyValueSet {
key: "counter".to_string(),
value: "42".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
let val = provider.get_kv_value("kv-set", "counter").await.unwrap();
assert_eq!(val, Some("42".to_string()));
}
pub async fn test_kv_overwrite<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-over").await.unwrap();
ack_with_delta(
&*provider,
"kv-over",
1,
vec![Event::with_event_id(
100,
"kv-over",
1,
None,
EventKind::KeyValueSet {
key: "status".to_string(),
value: "old".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
let val = provider.get_kv_value("kv-over", "status").await.unwrap();
assert_eq!(val, Some("old".to_string()), "initial value should be 'old'");
ack_with_delta(
&*provider,
"kv-over",
1,
vec![Event::with_event_id(
101,
"kv-over",
1,
None,
EventKind::KeyValueSet {
key: "status".to_string(),
value: "new".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
let val = provider.get_kv_value("kv-over", "status").await.unwrap();
assert_eq!(val, Some("new".to_string()));
}
pub async fn test_kv_clear_single<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-clr1").await.unwrap();
ack_with_delta(
&*provider,
"kv-clr1",
1,
vec![Event::with_event_id(
100,
"kv-clr1",
1,
None,
EventKind::KeyValueSet {
key: "remove_me".to_string(),
value: "x".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
let val = provider.get_kv_value("kv-clr1", "remove_me").await.unwrap();
assert_eq!(val, Some("x".to_string()), "value should be set before clearing");
ack_with_delta(
&*provider,
"kv-clr1",
1,
vec![Event::with_event_id(
101,
"kv-clr1",
1,
None,
EventKind::KeyValueCleared {
key: "remove_me".to_string(),
},
)],
)
.await;
let val = provider.get_kv_value("kv-clr1", "remove_me").await.unwrap();
assert_eq!(val, None);
}
pub async fn test_kv_clear_all<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-clra").await.unwrap();
ack_with_delta(
&*provider,
"kv-clra",
1,
vec![
Event::with_event_id(
100,
"kv-clra",
1,
None,
EventKind::KeyValueSet {
key: "a".to_string(),
value: "1".to_string(),
last_updated_at_ms: 0,
},
),
Event::with_event_id(
101,
"kv-clra",
1,
None,
EventKind::KeyValueSet {
key: "b".to_string(),
value: "2".to_string(),
last_updated_at_ms: 0,
},
),
],
)
.await;
assert_eq!(
provider.get_kv_value("kv-clra", "a").await.unwrap(),
Some("1".to_string()),
"key 'a' should be set"
);
assert_eq!(
provider.get_kv_value("kv-clra", "b").await.unwrap(),
Some("2".to_string()),
"key 'b' should be set"
);
ack_with_delta(
&*provider,
"kv-clra",
1,
vec![Event::with_event_id(
102,
"kv-clra",
1,
None,
EventKind::KeyValuesCleared,
)],
)
.await;
assert_eq!(provider.get_kv_value("kv-clra", "a").await.unwrap(), None);
assert_eq!(provider.get_kv_value("kv-clra", "b").await.unwrap(), None);
}
pub async fn test_kv_get_nonexistent<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-none").await.unwrap();
let val = provider.get_kv_value("kv-none", "nope").await.unwrap();
assert_eq!(val, None);
}
pub async fn test_kv_snapshot_in_fetch<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-fetch").await.unwrap();
ack_with_delta(
&*provider,
"kv-fetch",
1,
vec![
Event::with_event_id(
100,
"kv-fetch",
1,
None,
EventKind::KeyValueSet {
key: "x".to_string(),
value: "10".to_string(),
last_updated_at_ms: 0,
},
),
Event::with_event_id(
101,
"kv-fetch",
1,
None,
EventKind::KeyValueSet {
key: "y".to_string(),
value: "20".to_string(),
last_updated_at_ms: 0,
},
),
],
)
.await;
complete_instance(&*provider, "kv-fetch", 1).await;
provider
.enqueue_for_orchestrator(poke_item("kv-fetch"), None)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item");
assert_eq!(item.kv_snapshot.len(), 2);
assert_eq!(item.kv_snapshot.get("x").map(|e| &*e.value), Some("10"));
assert_eq!(item.kv_snapshot.get("y").map(|e| &*e.value), Some("20"));
provider
.ack_orchestration_item(
&lock_token,
1,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
pub async fn test_kv_snapshot_after_clear_single<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-fclr1").await.unwrap();
ack_with_delta(
&*provider,
"kv-fclr1",
1,
vec![
Event::with_event_id(
100,
"kv-fclr1",
1,
None,
EventKind::KeyValueSet {
key: "keep".to_string(),
value: "yes".to_string(),
last_updated_at_ms: 0,
},
),
Event::with_event_id(
101,
"kv-fclr1",
1,
None,
EventKind::KeyValueSet {
key: "remove".to_string(),
value: "bye".to_string(),
last_updated_at_ms: 0,
},
),
],
)
.await;
ack_with_delta(
&*provider,
"kv-fclr1",
1,
vec![Event::with_event_id(
102,
"kv-fclr1",
1,
None,
EventKind::KeyValueCleared {
key: "remove".to_string(),
},
)],
)
.await;
complete_instance(&*provider, "kv-fclr1", 1).await;
provider
.enqueue_for_orchestrator(poke_item("kv-fclr1"), None)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item");
assert_eq!(item.kv_snapshot.len(), 1);
assert_eq!(item.kv_snapshot.get("keep").map(|e| &*e.value), Some("yes"));
assert_eq!(item.kv_snapshot.get("remove"), None);
provider
.ack_orchestration_item(
&lock_token,
1,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
pub async fn test_kv_snapshot_after_clear_all<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-fclra").await.unwrap();
ack_with_delta(
&*provider,
"kv-fclra",
1,
vec![
Event::with_event_id(
100,
"kv-fclra",
1,
None,
EventKind::KeyValueSet {
key: "a".to_string(),
value: "1".to_string(),
last_updated_at_ms: 0,
},
),
Event::with_event_id(
101,
"kv-fclra",
1,
None,
EventKind::KeyValueSet {
key: "b".to_string(),
value: "2".to_string(),
last_updated_at_ms: 0,
},
),
],
)
.await;
ack_with_delta(
&*provider,
"kv-fclra",
1,
vec![Event::with_event_id(
102,
"kv-fclra",
1,
None,
EventKind::KeyValuesCleared,
)],
)
.await;
provider
.enqueue_for_orchestrator(poke_item("kv-fclra"), None)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item");
assert!(
item.kv_snapshot.is_empty(),
"expected empty snapshot, got: {:?}",
item.kv_snapshot
);
provider
.ack_orchestration_item(
&lock_token,
1,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
pub async fn test_kv_execution_id_tracking<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
create_instance(&*provider, "kv-exec").await.unwrap();
ack_with_delta(
&*provider,
"kv-exec",
1,
vec![Event::with_event_id(
100,
"kv-exec",
1,
None,
EventKind::KeyValueSet {
key: "shared".to_string(),
value: "from_exec_1".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
let val = provider.get_kv_value("kv-exec", "shared").await.unwrap();
assert_eq!(val, Some("from_exec_1".to_string()));
continue_as_new(&*provider, "kv-exec", 2).await;
ack_with_delta(
&*provider,
"kv-exec",
2,
vec![Event::with_event_id(
200,
"kv-exec",
2,
None,
EventKind::KeyValueSet {
key: "shared".to_string(),
value: "from_exec_2".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
let val = provider.get_kv_value("kv-exec", "shared").await.unwrap();
assert_eq!(val, Some("from_exec_2".to_string()), "should reflect exec 2 value");
mgmt.prune_executions(
"kv-exec",
PruneOptions {
keep_last: Some(1),
..Default::default()
},
)
.await
.unwrap();
let val = provider.get_kv_value("kv-exec", "shared").await.unwrap();
assert_eq!(
val,
Some("from_exec_2".to_string()),
"key must survive pruning because last-writer (exec 2) is not pruned",
);
}
pub async fn test_kv_cross_execution_overwrite<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-xexec").await.unwrap();
ack_with_delta(
&*provider,
"kv-xexec",
1,
vec![Event::with_event_id(
100,
"kv-xexec",
1,
None,
EventKind::KeyValueSet {
key: "k".to_string(),
value: "v1".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
let val = provider.get_kv_value("kv-xexec", "k").await.unwrap();
assert_eq!(val, Some("v1".to_string()), "should see value from exec 1");
continue_as_new(&*provider, "kv-xexec", 2).await;
ack_with_delta(
&*provider,
"kv-xexec",
2,
vec![Event::with_event_id(
200,
"kv-xexec",
2,
None,
EventKind::KeyValueSet {
key: "k".to_string(),
value: "v2".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
let val = provider.get_kv_value("kv-xexec", "k").await.unwrap();
assert_eq!(val, Some("v2".to_string()), "exec 2 should overwrite exec 1 value");
}
pub async fn test_kv_cross_execution_remove_readd<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-xrm").await.unwrap();
ack_with_delta(
&*provider,
"kv-xrm",
1,
vec![Event::with_event_id(
100,
"kv-xrm",
1,
None,
EventKind::KeyValueSet {
key: "cycle".to_string(),
value: "exec1".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
assert_eq!(
provider.get_kv_value("kv-xrm", "cycle").await.unwrap(),
Some("exec1".to_string()),
);
continue_as_new(&*provider, "kv-xrm", 2).await;
ack_with_delta(
&*provider,
"kv-xrm",
2,
vec![Event::with_event_id(
200,
"kv-xrm",
2,
None,
EventKind::KeyValueCleared {
key: "cycle".to_string(),
},
)],
)
.await;
assert_eq!(
provider.get_kv_value("kv-xrm", "cycle").await.unwrap(),
None,
"key should be cleared in exec 2",
);
continue_as_new(&*provider, "kv-xrm", 3).await;
ack_with_delta(
&*provider,
"kv-xrm",
3,
vec![Event::with_event_id(
300,
"kv-xrm",
3,
None,
EventKind::KeyValueSet {
key: "cycle".to_string(),
value: "exec3".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
assert_eq!(
provider.get_kv_value("kv-xrm", "cycle").await.unwrap(),
Some("exec3".to_string()),
"key should be re-set from exec 3",
);
}
pub async fn test_kv_prune_preserves_overwritten<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
create_instance(&*provider, "kv-prn1").await.unwrap();
ack_with_delta(
&*provider,
"kv-prn1",
1,
vec![Event::with_event_id(
100,
"kv-prn1",
1,
None,
EventKind::KeyValueSet {
key: "survive".to_string(),
value: "v1".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
continue_as_new(&*provider, "kv-prn1", 2).await;
ack_with_delta(
&*provider,
"kv-prn1",
2,
vec![Event::with_event_id(
200,
"kv-prn1",
2,
None,
EventKind::KeyValueSet {
key: "survive".to_string(),
value: "v2".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
mgmt.prune_executions(
"kv-prn1",
PruneOptions {
keep_last: Some(1),
..Default::default()
},
)
.await
.unwrap();
let val = provider.get_kv_value("kv-prn1", "survive").await.unwrap();
assert_eq!(
val,
Some("v2".to_string()),
"overwritten key must survive pruning of old execution"
);
}
pub async fn test_kv_prune_preserves_all_keys<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
create_instance(&*provider, "kv-prn2").await.unwrap();
ack_with_delta(
&*provider,
"kv-prn2",
1,
vec![Event::with_event_id(
100,
"kv-prn2",
1,
None,
EventKind::KeyValueSet {
key: "orphan".to_string(),
value: "survives".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
continue_as_new(&*provider, "kv-prn2", 2).await;
ack_with_delta(
&*provider,
"kv-prn2",
2,
vec![Event::with_event_id(
200,
"kv-prn2",
2,
None,
EventKind::KeyValueSet {
key: "keeper".to_string(),
value: "alive".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
assert_eq!(
provider.get_kv_value("kv-prn2", "orphan").await.unwrap(),
Some("survives".to_string()),
);
assert_eq!(
provider.get_kv_value("kv-prn2", "keeper").await.unwrap(),
Some("alive".to_string()),
);
mgmt.prune_executions(
"kv-prn2",
PruneOptions {
keep_last: Some(1),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(
provider.get_kv_value("kv-prn2", "orphan").await.unwrap(),
Some("survives".to_string()),
"KV entries must survive execution pruning (instance-scoped lifetime)",
);
assert_eq!(
provider.get_kv_value("kv-prn2", "keeper").await.unwrap(),
Some("alive".to_string()),
"exec 2 key must survive",
);
}
pub async fn test_kv_instance_isolation<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-iso-a").await.unwrap();
create_instance(&*provider, "kv-iso-b").await.unwrap();
ack_with_delta(
&*provider,
"kv-iso-a",
1,
vec![Event::with_event_id(
100,
"kv-iso-a",
1,
None,
EventKind::KeyValueSet {
key: "shared_name".to_string(),
value: "from_a".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
ack_with_delta(
&*provider,
"kv-iso-b",
1,
vec![Event::with_event_id(
100,
"kv-iso-b",
1,
None,
EventKind::KeyValueSet {
key: "shared_name".to_string(),
value: "from_b".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
assert_eq!(
provider.get_kv_value("kv-iso-a", "shared_name").await.unwrap(),
Some("from_a".to_string()),
);
assert_eq!(
provider.get_kv_value("kv-iso-b", "shared_name").await.unwrap(),
Some("from_b".to_string()),
);
}
pub async fn test_kv_delete_instance_cascades<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
create_instance(&*provider, "kv-del").await.unwrap();
ack_with_delta(
&*provider,
"kv-del",
1,
vec![Event::with_event_id(
100,
"kv-del",
1,
None,
EventKind::KeyValueSet {
key: "doomed".to_string(),
value: "bye".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
complete_instance(&*provider, "kv-del", 1).await;
assert_eq!(
provider.get_kv_value("kv-del", "doomed").await.unwrap(),
Some("bye".to_string()),
"value should exist before deletion",
);
mgmt.delete_instance("kv-del", false).await.unwrap();
assert_eq!(
provider.get_kv_value("kv-del", "doomed").await.unwrap(),
None,
"KV should be gone after instance deletion",
);
}
pub async fn test_kv_clear_nonexistent_key<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-clrne").await.unwrap();
ack_with_delta(
&*provider,
"kv-clrne",
1,
vec![Event::with_event_id(
100,
"kv-clrne",
1,
None,
EventKind::KeyValueCleared {
key: "never_existed".to_string(),
},
)],
)
.await;
let val = provider.get_kv_value("kv-clrne", "never_existed").await.unwrap();
assert_eq!(val, None);
}
pub async fn test_kv_get_unknown_instance<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
let val = provider.get_kv_value("no-such-instance", "key").await.unwrap();
assert_eq!(val, None);
}
pub async fn test_kv_set_after_clear<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-sac").await.unwrap();
ack_with_delta(
&*provider,
"kv-sac",
1,
vec![
Event::with_event_id(
100,
"kv-sac",
1,
None,
EventKind::KeyValueSet {
key: "old_a".to_string(),
value: "1".to_string(),
last_updated_at_ms: 0,
},
),
Event::with_event_id(
101,
"kv-sac",
1,
None,
EventKind::KeyValueSet {
key: "old_b".to_string(),
value: "2".to_string(),
last_updated_at_ms: 0,
},
),
],
)
.await;
ack_with_delta(
&*provider,
"kv-sac",
1,
vec![
Event::with_event_id(102, "kv-sac", 1, None, EventKind::KeyValuesCleared),
Event::with_event_id(
103,
"kv-sac",
1,
None,
EventKind::KeyValueSet {
key: "new_x".to_string(),
value: "fresh".to_string(),
last_updated_at_ms: 0,
},
),
],
)
.await;
assert_eq!(provider.get_kv_value("kv-sac", "old_a").await.unwrap(), None);
assert_eq!(provider.get_kv_value("kv-sac", "old_b").await.unwrap(), None);
assert_eq!(
provider.get_kv_value("kv-sac", "new_x").await.unwrap(),
Some("fresh".to_string()),
);
}
pub async fn test_kv_empty_value<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-empty").await.unwrap();
ack_with_delta(
&*provider,
"kv-empty",
1,
vec![Event::with_event_id(
100,
"kv-empty",
1,
None,
EventKind::KeyValueSet {
key: "blank".to_string(),
value: "".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
let val = provider.get_kv_value("kv-empty", "blank").await.unwrap();
assert_eq!(val, Some("".to_string()), "empty string is a valid value, not None");
}
pub async fn test_kv_large_value<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-big").await.unwrap();
let big_val = "x".repeat(16 * 1024);
ack_with_delta(
&*provider,
"kv-big",
1,
vec![Event::with_event_id(
100,
"kv-big",
1,
None,
EventKind::KeyValueSet {
key: "payload".to_string(),
value: big_val.clone(),
last_updated_at_ms: 0,
},
)],
)
.await;
let val = provider.get_kv_value("kv-big", "payload").await.unwrap();
assert_eq!(val.as_deref(), Some(big_val.as_str()));
}
pub async fn test_kv_special_chars_in_key<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-chars").await.unwrap();
let keys = vec![
("key with spaces", "v1"),
("日本語ã‚ー", "v2"),
("dotted.key.name", "v3"),
("path/like/key", "v4"),
("emoji🎉key", "v5"),
];
let events: Vec<_> = keys
.iter()
.enumerate()
.map(|(i, (k, v))| {
Event::with_event_id(
100 + i as u64,
"kv-chars",
1,
None,
EventKind::KeyValueSet {
key: k.to_string(),
value: v.to_string(),
last_updated_at_ms: 0,
},
)
})
.collect();
ack_with_delta(&*provider, "kv-chars", 1, events).await;
for (k, v) in &keys {
let val = provider.get_kv_value("kv-chars", k).await.unwrap();
assert_eq!(val, Some(v.to_string()), "key '{k}' should be retrievable");
}
}
pub async fn test_kv_snapshot_empty<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-sempty").await.unwrap();
provider
.enqueue_for_orchestrator(poke_item("kv-sempty"), None)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item");
assert!(
item.kv_snapshot.is_empty(),
"expected empty snapshot for fresh instance"
);
provider
.ack_orchestration_item(
&lock_token,
1,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
pub async fn test_kv_snapshot_cross_execution<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-scross").await.unwrap();
ack_with_delta(
&*provider,
"kv-scross",
1,
vec![Event::with_event_id(
100,
"kv-scross",
1,
None,
EventKind::KeyValueSet {
key: "A".to_string(),
value: "from_exec1".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
continue_as_new(&*provider, "kv-scross", 2).await;
ack_with_delta(
&*provider,
"kv-scross",
2,
vec![Event::with_event_id(
200,
"kv-scross",
2,
None,
EventKind::KeyValueSet {
key: "B".to_string(),
value: "from_exec2".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
provider
.enqueue_for_orchestrator(poke_item("kv-scross"), None)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item");
assert_eq!(
item.kv_snapshot.len(),
1,
"snapshot should only contain prior-execution state"
);
assert_eq!(item.kv_snapshot.get("A").map(|e| &*e.value), Some("from_exec1"));
assert_eq!(
item.kv_snapshot.get("B"),
None,
"current-execution key should not be in snapshot"
);
provider
.ack_orchestration_item(
&lock_token,
2,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
pub async fn test_kv_prune_current_execution_protected<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
create_instance(&*provider, "kv-prncur").await.unwrap();
ack_with_delta(
&*provider,
"kv-prncur",
1,
vec![Event::with_event_id(
100,
"kv-prncur",
1,
None,
EventKind::KeyValueSet {
key: "alive".to_string(),
value: "yes".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
mgmt.prune_executions(
"kv-prncur",
PruneOptions {
keep_last: Some(1),
..Default::default()
},
)
.await
.unwrap();
let val = provider.get_kv_value("kv-prncur", "alive").await.unwrap();
assert_eq!(
val,
Some("yes".to_string()),
"current execution KV must survive pruning"
);
}
pub async fn test_kv_delete_instance_with_children<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
create_instance(&*provider, "kv-parent").await.unwrap();
ack_with_delta(
&*provider,
"kv-parent",
1,
vec![Event::with_event_id(
100,
"kv-parent",
1,
None,
EventKind::KeyValueSet {
key: "parent_key".to_string(),
value: "parent_val".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
let child_start = WorkItem::StartOrchestration {
instance: "kv-child".to_string(),
orchestration: "TestOrch".to_string(),
version: Some("1.0.0".to_string()),
input: "{}".to_string(),
parent_instance: Some("kv-parent".to_string()),
parent_id: Some(1),
execution_id: crate::INITIAL_EXECUTION_ID,
};
provider.enqueue_for_orchestrator(child_start, None).await.unwrap();
let (_, child_lock, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected child orchestration item");
provider
.ack_orchestration_item(
&child_lock,
1,
vec![Event::with_event_id(
1,
"kv-child",
1,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: Some("kv-parent".to_string()),
parent_id: Some(1),
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
parent_instance_id: Some("kv-parent".to_string()),
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
ack_with_delta(
&*provider,
"kv-child",
1,
vec![Event::with_event_id(
100,
"kv-child",
1,
None,
EventKind::KeyValueSet {
key: "child_key".to_string(),
value: "child_val".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
complete_instance(&*provider, "kv-child", 1).await;
complete_instance(&*provider, "kv-parent", 1).await;
mgmt.delete_instance("kv-parent", false).await.unwrap();
assert_eq!(
provider.get_kv_value("kv-parent", "parent_key").await.unwrap(),
None,
"parent KV should be removed",
);
assert_eq!(
provider.get_kv_value("kv-child", "child_key").await.unwrap(),
None,
"child KV should be removed when parent is deleted",
);
}
pub async fn test_kv_clear_isolation<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-ciso-a").await.unwrap();
create_instance(&*provider, "kv-ciso-b").await.unwrap();
ack_with_delta(
&*provider,
"kv-ciso-a",
1,
vec![Event::with_event_id(
100,
"kv-ciso-a",
1,
None,
EventKind::KeyValueSet {
key: "shared".to_string(),
value: "from_a".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
ack_with_delta(
&*provider,
"kv-ciso-b",
1,
vec![Event::with_event_id(
100,
"kv-ciso-b",
1,
None,
EventKind::KeyValueSet {
key: "shared".to_string(),
value: "from_b".to_string(),
last_updated_at_ms: 0,
},
)],
)
.await;
ack_with_delta(
&*provider,
"kv-ciso-a",
1,
vec![Event::with_event_id(
101,
"kv-ciso-a",
1,
None,
EventKind::KeyValuesCleared,
)],
)
.await;
assert_eq!(provider.get_kv_value("kv-ciso-a", "shared").await.unwrap(), None);
assert_eq!(
provider.get_kv_value("kv-ciso-b", "shared").await.unwrap(),
Some("from_b".to_string()),
"clearing instance A must not affect instance B",
);
}
pub async fn test_kv_delta_snapshot_excludes_current_execution<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-ds1").await.unwrap();
ack_with_delta(
&*provider,
"kv-ds1",
1,
vec![Event::with_event_id(
100,
"kv-ds1",
1,
None,
EventKind::KeyValueSet {
key: "counter".to_string(),
value: "1".to_string(),
last_updated_at_ms: 100,
},
)],
)
.await;
assert_eq!(
provider.get_kv_value("kv-ds1", "counter").await.unwrap(),
Some("1".to_string()),
"client read should see current-execution value",
);
provider
.enqueue_for_orchestrator(poke_item("kv-ds1"), None)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item");
assert!(
item.kv_snapshot.is_empty(),
"snapshot must be empty — counter was set in current execution (not completed), \
should not be in snapshot. Got: {:?}",
item.kv_snapshot,
);
provider
.ack_orchestration_item(
&lock_token,
1,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
pub async fn test_kv_delta_snapshot_includes_completed_execution<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-ds2").await.unwrap();
ack_with_delta(
&*provider,
"kv-ds2",
1,
vec![Event::with_event_id(
100,
"kv-ds2",
1,
None,
EventKind::KeyValueSet {
key: "counter".to_string(),
value: "5".to_string(),
last_updated_at_ms: 100,
},
)],
)
.await;
continue_as_new(&*provider, "kv-ds2", 2).await;
provider
.enqueue_for_orchestrator(poke_item("kv-ds2"), None)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item");
assert_eq!(
item.kv_snapshot.get("counter").map(|e| &*e.value),
Some("5"),
"snapshot must contain prior-execution value after CAN",
);
provider
.ack_orchestration_item(
&lock_token,
2,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
pub async fn test_kv_delta_client_reads_merged<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-dm").await.unwrap();
ack_with_delta(
&*provider,
"kv-dm",
1,
vec![
Event::with_event_id(
100,
"kv-dm",
1,
None,
EventKind::KeyValueSet {
key: "color".to_string(),
value: "red".to_string(),
last_updated_at_ms: 100,
},
),
Event::with_event_id(
101,
"kv-dm",
1,
None,
EventKind::KeyValueSet {
key: "size".to_string(),
value: "large".to_string(),
last_updated_at_ms: 101,
},
),
],
)
.await;
complete_instance(&*provider, "kv-dm", 1).await;
ack_with_delta(
&*provider,
"kv-dm",
2,
vec![
Event::with_event_id(
200,
"kv-dm",
2,
None,
EventKind::KeyValueSet {
key: "color".to_string(),
value: "blue".to_string(),
last_updated_at_ms: 200,
},
),
Event::with_event_id(
201,
"kv-dm",
2,
None,
EventKind::KeyValueSet {
key: "shape".to_string(),
value: "circle".to_string(),
last_updated_at_ms: 201,
},
),
],
)
.await;
assert_eq!(
provider.get_kv_value("kv-dm", "color").await.unwrap(),
Some("blue".to_string()),
"delta should override kv_store value",
);
assert_eq!(
provider.get_kv_value("kv-dm", "size").await.unwrap(),
Some("large".to_string()),
"kv_store value should be visible when no delta exists",
);
assert_eq!(
provider.get_kv_value("kv-dm", "shape").await.unwrap(),
Some("circle".to_string()),
"delta-only key should be visible",
);
}
pub async fn test_kv_delta_tombstone_overrides_store<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-dt").await.unwrap();
ack_with_delta(
&*provider,
"kv-dt",
1,
vec![Event::with_event_id(
100,
"kv-dt",
1,
None,
EventKind::KeyValueSet {
key: "temp".to_string(),
value: "exists".to_string(),
last_updated_at_ms: 100,
},
)],
)
.await;
complete_instance(&*provider, "kv-dt", 1).await;
ack_with_delta(
&*provider,
"kv-dt",
2,
vec![Event::with_event_id(
200,
"kv-dt",
2,
None,
EventKind::KeyValueCleared {
key: "temp".to_string(),
},
)],
)
.await;
assert_eq!(
provider.get_kv_value("kv-dt", "temp").await.unwrap(),
None,
"tombstone in delta must override kv_store value",
);
}
pub async fn test_kv_delta_clear_all_tombstones_store<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-dca").await.unwrap();
ack_with_delta(
&*provider,
"kv-dca",
1,
vec![
Event::with_event_id(
100,
"kv-dca",
1,
None,
EventKind::KeyValueSet {
key: "a".to_string(),
value: "1".to_string(),
last_updated_at_ms: 100,
},
),
Event::with_event_id(
101,
"kv-dca",
1,
None,
EventKind::KeyValueSet {
key: "b".to_string(),
value: "2".to_string(),
last_updated_at_ms: 101,
},
),
],
)
.await;
complete_instance(&*provider, "kv-dca", 1).await;
ack_with_delta(
&*provider,
"kv-dca",
2,
vec![
Event::with_event_id(200, "kv-dca", 2, None, EventKind::KeyValuesCleared),
Event::with_event_id(
201,
"kv-dca",
2,
None,
EventKind::KeyValueSet {
key: "c".to_string(),
value: "3".to_string(),
last_updated_at_ms: 201,
},
),
],
)
.await;
assert_eq!(
provider.get_kv_value("kv-dca", "a").await.unwrap(),
None,
"clear_all must tombstone kv_store key 'a'",
);
assert_eq!(
provider.get_kv_value("kv-dca", "b").await.unwrap(),
None,
"clear_all must tombstone kv_store key 'b'",
);
assert_eq!(
provider.get_kv_value("kv-dca", "c").await.unwrap(),
Some("3".to_string()),
"key set after clear_all should be visible",
);
}
pub async fn test_kv_delta_merged_on_completion<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-dmc").await.unwrap();
ack_with_delta(
&*provider,
"kv-dmc",
1,
vec![Event::with_event_id(
100,
"kv-dmc",
1,
None,
EventKind::KeyValueSet {
key: "result".to_string(),
value: "42".to_string(),
last_updated_at_ms: 100,
},
)],
)
.await;
complete_instance(&*provider, "kv-dmc", 1).await;
assert_eq!(
provider.get_kv_value("kv-dmc", "result").await.unwrap(),
Some("42".to_string()),
"value should be in kv_store after completion merge",
);
}
pub async fn test_kv_delta_merged_on_can<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
create_instance(&*provider, "kv-dmc2").await.unwrap();
ack_with_delta(
&*provider,
"kv-dmc2",
1,
vec![
Event::with_event_id(
100,
"kv-dmc2",
1,
None,
EventKind::KeyValueSet {
key: "version".to_string(),
value: "1".to_string(),
last_updated_at_ms: 100,
},
),
Event::with_event_id(
101,
"kv-dmc2",
1,
None,
EventKind::KeyValueSet {
key: "progress".to_string(),
value: "50%".to_string(),
last_updated_at_ms: 101,
},
),
],
)
.await;
ack_with_delta(
&*provider,
"kv-dmc2",
1,
vec![Event::with_event_id(
102,
"kv-dmc2",
1,
None,
EventKind::KeyValueSet {
key: "progress".to_string(),
value: "100%".to_string(),
last_updated_at_ms: 102,
},
)],
)
.await;
continue_as_new(&*provider, "kv-dmc2", 2).await;
provider
.enqueue_for_orchestrator(poke_item("kv-dmc2"), None)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item");
assert_eq!(
item.kv_snapshot.get("version").map(|e| &*e.value),
Some("1"),
"version should carry over via kv_store after CAN merge",
);
assert_eq!(
item.kv_snapshot.get("progress").map(|e| &*e.value),
Some("100%"),
"latest progress value should carry over",
);
provider
.ack_orchestration_item(
&lock_token,
2,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
pub async fn test_kv_delta_delete_instance_cascades<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
create_instance(&*provider, "kv-ddel").await.unwrap();
ack_with_delta(
&*provider,
"kv-ddel",
1,
vec![Event::with_event_id(
100,
"kv-ddel",
1,
None,
EventKind::KeyValueSet {
key: "data".to_string(),
value: "important".to_string(),
last_updated_at_ms: 100,
},
)],
)
.await;
assert_eq!(
provider.get_kv_value("kv-ddel", "data").await.unwrap(),
Some("important".to_string()),
);
mgmt.delete_instance("kv-ddel", true).await.unwrap();
assert_eq!(
provider.get_kv_value("kv-ddel", "data").await.unwrap(),
None,
"KV delta must be cleaned up on instance deletion",
);
}
pub async fn test_kv_delta_prune_untouched_key_survives<F: ProviderFactory>(factory: &F) {
let provider = factory.create_provider().await;
let mgmt = provider
.as_management_capability()
.expect("Provider should implement ProviderAdmin");
create_instance(&*provider, "kv-dprn").await.unwrap();
ack_with_delta(
&*provider,
"kv-dprn",
1,
vec![Event::with_event_id(
100,
"kv-dprn",
1,
None,
EventKind::KeyValueSet {
key: "stale_config".to_string(),
value: "from_exec_1".to_string(),
last_updated_at_ms: 100,
},
)],
)
.await;
continue_as_new(&*provider, "kv-dprn", 2).await;
ack_with_delta(
&*provider,
"kv-dprn",
2,
vec![Event::with_event_id(
200,
"kv-dprn",
2,
None,
EventKind::KeyValueSet {
key: "other".to_string(),
value: "unrelated".to_string(),
last_updated_at_ms: 200,
},
)],
)
.await;
continue_as_new(&*provider, "kv-dprn", 3).await;
assert_eq!(
provider.get_kv_value("kv-dprn", "stale_config").await.unwrap(),
Some("from_exec_1".to_string()),
);
assert_eq!(
provider.get_kv_value("kv-dprn", "other").await.unwrap(),
Some("unrelated".to_string()),
);
mgmt.prune_executions(
"kv-dprn",
PruneOptions {
keep_last: Some(1),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(
provider.get_kv_value("kv-dprn", "stale_config").await.unwrap(),
Some("from_exec_1".to_string()),
"KV key from pruned execution must survive — KV lifetime is instance-scoped",
);
assert_eq!(
provider.get_kv_value("kv-dprn", "other").await.unwrap(),
Some("unrelated".to_string()),
"KV key from pruned execution 2 must also survive",
);
provider
.enqueue_for_orchestrator(poke_item("kv-dprn"), None)
.await
.unwrap();
let (item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item");
assert_eq!(
item.kv_snapshot.get("stale_config").map(|e| &*e.value),
Some("from_exec_1"),
"snapshot must include untouched key from pruned execution",
);
assert_eq!(
item.kv_snapshot.get("other").map(|e| &*e.value),
Some("unrelated"),
"snapshot must include key from pruned execution 2",
);
provider
.ack_orchestration_item(
&lock_token,
3,
vec![],
vec![],
vec![],
ExecutionMetadata::default(),
vec![],
)
.await
.unwrap();
}
async fn continue_as_new(provider: &dyn crate::providers::Provider, instance: &str, new_execution_id: u64) {
let prev_execution_id = new_execution_id - 1;
provider
.enqueue_for_orchestrator(poke_item(instance), None)
.await
.unwrap();
let (_, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item to mark ContinuedAsNew");
provider
.ack_orchestration_item(
&lock_token,
prev_execution_id,
vec![],
vec![],
vec![],
ExecutionMetadata {
status: Some("ContinuedAsNew".to_string()),
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
let work_item = WorkItem::ContinueAsNew {
instance: instance.to_string(),
orchestration: "TestOrch".to_string(),
input: "{}".to_string(),
version: Some("1.0.0".to_string()),
carry_forward_events: vec![],
initial_custom_status: None,
};
provider.enqueue_for_orchestrator(work_item, None).await.unwrap();
let (_item, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item for ContinueAsNew");
provider
.ack_orchestration_item(
&lock_token,
new_execution_id,
vec![Event::with_event_id(
1,
instance,
new_execution_id,
None,
EventKind::OrchestrationStarted {
name: "TestOrch".to_string(),
version: "1.0.0".to_string(),
input: "{}".to_string(),
parent_instance: None,
parent_id: None,
carry_forward_events: None,
initial_custom_status: None,
},
)],
vec![],
vec![],
ExecutionMetadata {
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
}
async fn complete_instance(provider: &dyn crate::providers::Provider, instance: &str, execution_id: u64) {
provider
.enqueue_for_orchestrator(poke_item(instance), None)
.await
.unwrap();
let (_, lock_token, _) = provider
.fetch_orchestration_item(Duration::from_secs(30), Duration::ZERO, None)
.await
.unwrap()
.expect("expected orchestration item for completion");
provider
.ack_orchestration_item(
&lock_token,
execution_id,
vec![Event::with_event_id(
9999,
instance,
execution_id,
None,
EventKind::OrchestrationCompleted {
output: "done".to_string(),
},
)],
vec![],
vec![],
ExecutionMetadata {
status: Some("Completed".to_string()),
output: Some("done".to_string()),
orchestration_name: Some("TestOrch".to_string()),
orchestration_version: Some("1.0.0".to_string()),
..Default::default()
},
vec![],
)
.await
.unwrap();
}