use std::io::{BufReader, Cursor};
use pylon_plugin::builtin::cache::CachePlugin;
use pylon_plugin::builtin::file_storage::FileStoragePlugin;
use pylon_runtime::cron::CronExpr;
use pylon_runtime::resp::parse_resp;
use pylon_runtime::workflows::{WorkflowDef, WorkflowEngine, WorkflowStatus};
#[test]
fn resp_parser_doesnt_panic_on_garbage() {
let inputs: Vec<&[u8]> = vec![
b"",
b"\r\n",
b"garbage",
b"+\r\n",
b"$-1\r\n",
b"*-1\r\n",
b"$999999999\r\n",
b"*0\r\n",
b":\r\n",
b":abc\r\n",
b"+OK", b"-ERR", b"$0\r\n\r\n",
b"\x00\x01\x02\x03",
b"+++\r\n",
b"---\r\n",
b"$2\r\nab\r\n",
b"$2\r\na", b"*2\r\n+OK\r\n", b"*1\r\n*1\r\n*1\r\n+deep\r\n", ];
let star_repeat = "*".repeat(1000);
let plus_repeat = "+".repeat(10000);
for input in inputs {
let mut reader = BufReader::new(Cursor::new(input));
let _ = parse_resp(&mut reader); }
for large in [star_repeat.as_bytes(), plus_repeat.as_bytes()] {
let mut reader = BufReader::new(Cursor::new(large));
let _ = parse_resp(&mut reader);
}
}
#[test]
fn resp_roundtrip_property() {
use pylon_runtime::resp::RespValue;
let values = vec![
RespValue::SimpleString(String::new()),
RespValue::SimpleString("hello world".into()),
RespValue::Error("ERR bad".into()),
RespValue::Integer(0),
RespValue::Integer(-1),
RespValue::Integer(i64::MAX),
RespValue::Integer(i64::MIN),
RespValue::BulkString(None),
RespValue::BulkString(Some(String::new())),
RespValue::BulkString(Some("x".repeat(10_000))),
RespValue::Array(None),
RespValue::Array(Some(vec![])),
RespValue::Array(Some(vec![
RespValue::Integer(1),
RespValue::BulkString(Some("two".into())),
RespValue::BulkString(None),
])),
];
for val in &values {
let bytes = val.serialize();
let mut reader = BufReader::new(Cursor::new(&bytes));
let parsed = parse_resp(&mut reader).expect("roundtrip parse should succeed");
assert_eq!(&parsed, val, "roundtrip mismatch for {val:?}");
}
}
#[test]
fn cron_parser_doesnt_panic_on_garbage() {
let inputs = vec![
"",
"* * * * *",
"*/0 * * * *",
"99 99 99 99 99",
"-1 * * * *",
"a b c d e",
"* * * *", "* * * * * *", "0-60 * * * *", "*/abc * * * *",
",,,, * * * *",
"1-2-3 * * * *",
"0, 5, 10 * * * *", " ",
"\n\t",
"0 0 31 2 *", ];
let star_repeat = "*".repeat(100);
for input in inputs {
let _ = CronExpr::parse(input); }
let _ = CronExpr::parse(&star_repeat);
}
#[test]
fn cron_matches_edge_timestamps() {
let cron = CronExpr::parse("* * * * *").unwrap();
let timestamps: Vec<u64> = vec![
0,
1,
86400,
86400 * 365 * 50, 1_000_000_000, 2_000_000_000, u64::MAX / 2,
];
for ts in timestamps {
let _ = cron.matches(ts); }
}
#[test]
fn cache_doesnt_panic_on_weird_keys() {
let cache = CachePlugin::new(1000);
let weird_keys: Vec<String> = vec![
String::new(),
" ".into(),
"\0".into(),
"\n\r\t".into(),
"a".repeat(10_000),
"key with spaces".into(),
"key\0null".into(),
"\u{65e5}\u{672c}\u{8a9e}\u{30ad}\u{30fc}".into(),
"../../../etc/passwd".into(),
"key;DROP TABLE".into(),
];
for key in &weird_keys {
cache.set(key, "value", None);
let _ = cache.get(key);
let _ = cache.incr(key); cache.del(key);
let _ = cache.exists(key);
let _ = cache.ttl(key);
let _ = cache.key_type(key);
}
}
#[test]
fn concurrent_incr_atomicity() {
use std::sync::Arc;
use std::thread;
let cache = Arc::new(CachePlugin::new(100_000));
let threads: Vec<_> = (0..10)
.map(|_| {
let cache = Arc::clone(&cache);
thread::spawn(move || {
for _ in 0..1000 {
cache.incr("atomic_counter").unwrap();
}
})
})
.collect();
for t in threads {
t.join().unwrap();
}
let val = cache.get("atomic_counter").unwrap();
assert_eq!(val, "10000", "expected 10000, got {val}");
}
#[test]
fn concurrent_cache_mixed_ops() {
use std::sync::Arc;
use std::thread;
let cache = Arc::new(CachePlugin::new(100_000));
let threads: Vec<_> = (0..10)
.map(|i| {
let cache = Arc::clone(&cache);
thread::spawn(move || {
for j in 0..1000 {
let key = format!("key_{}_{}", i, j);
cache.set(&key, "value", None);
let _ = cache.get(&key);
if j % 3 == 0 {
cache.del(&key);
}
}
})
})
.collect();
for t in threads {
t.join().unwrap();
}
assert!(cache.dbsize() > 0);
}
#[test]
fn file_storage_rejects_traversal_variants() {
let dir = std::env::temp_dir().join("pylon_fuzz_file_storage");
let storage = FileStoragePlugin::local(&dir).unwrap();
let bad_ids = vec![
"../etc/passwd",
"..\\windows\\system32",
"foo/../bar",
"foo/bar",
"foo\\bar",
".hidden",
"..dotdot",
"%2e%2e/etc/passwd",
];
for id in bad_ids {
let result = storage.download(id);
assert!(
result.is_err(),
"download({id:?}) should be rejected but returned Ok"
);
}
let result = storage.download("normal_file");
assert!(result.is_err()); }
#[test]
fn workflow_state_machine_retry_exhaustion() {
let engine = WorkflowEngine::new("http://localhost:19999", 100);
engine.register(WorkflowDef {
name: "retry_test".into(),
description: "test".into(),
file: "test.ts".into(),
max_retries: 2,
step_timeout_secs: 30,
});
let id = engine.start("retry_test", serde_json::json!({})).unwrap();
for i in 0..2 {
let status = engine
.advance_with_response(
&id,
serde_json::json!({
"action": "fail",
"step_name": "flaky",
"error": format!("attempt {i}")
}),
)
.unwrap();
assert_eq!(
status,
WorkflowStatus::Running,
"retry {i} should keep running"
);
}
let status = engine
.advance_with_response(
&id,
serde_json::json!({
"action": "fail",
"step_name": "flaky",
"error": "final"
}),
)
.unwrap();
assert_eq!(status, WorkflowStatus::Failed);
let wf = engine.get(&id).unwrap();
assert_eq!(wf.status, WorkflowStatus::Failed);
assert_eq!(wf.error, Some("final".into()));
}
#[test]
fn workflow_terminal_states_are_idempotent() {
let engine = WorkflowEngine::new("http://localhost:19999", 100);
engine.register(WorkflowDef {
name: "terminal_test".into(),
description: "test".into(),
file: "test.ts".into(),
max_retries: 0,
step_timeout_secs: 30,
});
let id = engine
.start("terminal_test", serde_json::json!({}))
.unwrap();
engine
.advance_with_response(&id, serde_json::json!({"action": "complete", "output": 42}))
.unwrap();
let status = engine
.advance_with_response(
&id,
serde_json::json!({"action": "step_complete", "step_name": "ignored"}),
)
.unwrap();
assert_eq!(status, WorkflowStatus::Completed);
let id2 = engine
.start("terminal_test", serde_json::json!({}))
.unwrap();
engine.cancel(&id2).unwrap();
let status = engine
.advance_with_response(
&id2,
serde_json::json!({"action": "step_complete", "step_name": "ignored"}),
)
.unwrap();
assert_eq!(status, WorkflowStatus::Cancelled);
let id3 = engine
.start("terminal_test", serde_json::json!({}))
.unwrap();
engine
.advance_with_response(
&id3,
serde_json::json!({"action": "fail", "step_name": "s", "error": "boom"}),
)
.unwrap();
let status = engine
.advance_with_response(
&id3,
serde_json::json!({"action": "step_complete", "step_name": "ignored"}),
)
.unwrap();
assert_eq!(status, WorkflowStatus::Failed);
}
#[test]
fn workflow_mixed_action_sequence() {
let engine = WorkflowEngine::new("http://localhost:19999", 100);
engine.register(WorkflowDef {
name: "mixed_test".into(),
description: "test".into(),
file: "test.ts".into(),
max_retries: 1,
step_timeout_secs: 30,
});
let id = engine.start("mixed_test", serde_json::json!({})).unwrap();
let responses = vec![
serde_json::json!({"action": "step_complete", "step_name": "s1", "output": null}),
serde_json::json!({"action": "sleep", "duration": "0s"}), serde_json::json!({"action": "step_complete", "step_name": "s2", "output": "ok"}),
serde_json::json!({"action": "fail", "step_name": "s3", "error": "oops"}),
serde_json::json!({"action": "step_complete", "step_name": "s3", "output": "recovered"}),
serde_json::json!({"action": "complete", "output": {"done": true}}),
];
for resp in responses {
let status = engine.advance_with_response(&id, resp);
assert!(status.is_ok() || status.is_err());
}
let wf = engine.get(&id).unwrap();
assert_eq!(wf.status, WorkflowStatus::Completed);
}
#[test]
fn workflow_malformed_responses_dont_panic() {
let engine = WorkflowEngine::new("http://localhost:19999", 100);
engine.register(WorkflowDef {
name: "malformed_test".into(),
description: "test".into(),
file: "test.ts".into(),
max_retries: 0,
step_timeout_secs: 30,
});
let _id = engine
.start("malformed_test", serde_json::json!({}))
.unwrap();
let malformed_responses = vec![
serde_json::json!({}), serde_json::json!({"action": null}), serde_json::json!({"action": 42}), serde_json::json!({"action": ""}), serde_json::json!({"action": "nonexistent"}), serde_json::json!({"action": "sleep"}), serde_json::json!({"action": "fail"}), serde_json::json!([1, 2, 3]), serde_json::json!("just a string"),
serde_json::json!(42),
];
for resp in malformed_responses {
let test_id = engine
.start("malformed_test", serde_json::json!({}))
.unwrap();
let result = engine.advance_with_response(&test_id, resp);
let _ = result;
}
}