1use chrono::{DateTime, Utc};
20use serde::{Deserialize, Serialize};
21use std::fs;
22use std::io;
23use std::path::Path;
24
25use super::types::{CronPayload, JobState, ScheduleType, ScheduledJob, SessionTarget, WakeMode};
26
27pub const CURRENT_VERSION: u32 = 2;
33
34pub const LEGACY_VERSION: u32 = 1;
36
37#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
66#[serde(rename_all = "camelCase")]
67pub struct LegacyScheduledJob {
68 pub id: String,
70
71 pub cron: String,
73
74 pub source: String,
76
77 #[serde(default)]
79 pub paused: bool,
80
81 #[serde(skip_serializing_if = "Option::is_none")]
83 pub last_run: Option<DateTime<Utc>>,
84}
85
86#[derive(Clone, Debug, Serialize, Deserialize)]
94#[serde(rename_all = "camelCase")]
95pub struct StorageFile {
96 #[serde(default = "default_version")]
98 pub version: u32,
99
100 pub jobs: Vec<ScheduledJob>,
102}
103
104fn default_version() -> u32 {
105 CURRENT_VERSION
106}
107
108impl Default for StorageFile {
109 fn default() -> Self {
110 Self {
111 version: CURRENT_VERSION,
112 jobs: Vec::new(),
113 }
114 }
115}
116
117#[derive(Clone, Debug, Serialize, Deserialize)]
121#[serde(rename_all = "camelCase")]
122pub struct LegacyStorageFile {
123 #[serde(default)]
125 pub version: Option<u32>,
126
127 pub jobs: Vec<LegacyScheduledJob>,
129}
130
131#[derive(Debug, Clone, PartialEq)]
137pub enum StorageVersion {
138 Current,
140 Legacy,
142 Unknown(u32),
144}
145
146pub fn detect_version(path: impl AsRef<Path>) -> io::Result<StorageVersion> {
170 let content = fs::read_to_string(path)?;
171 detect_version_from_str(&content)
172}
173
174pub fn detect_version_from_str(content: &str) -> io::Result<StorageVersion> {
183 #[derive(Deserialize)]
185 struct VersionOnly {
186 #[serde(default)]
187 version: Option<u32>,
188 }
189
190 let version_info: VersionOnly =
191 serde_json::from_str(content).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
192
193 Ok(match version_info.version {
194 Some(v) if v == CURRENT_VERSION => StorageVersion::Current,
195 Some(v) if v == LEGACY_VERSION => StorageVersion::Legacy,
196 Some(v) => StorageVersion::Unknown(v),
197 None => StorageVersion::Legacy, })
199}
200
201pub fn needs_migration(path: impl AsRef<Path>) -> io::Result<bool> {
211 let path = path.as_ref();
212
213 if !path.exists() {
215 return Ok(false);
216 }
217
218 let version = detect_version(path)?;
219 Ok(matches!(version, StorageVersion::Legacy))
220}
221
222pub fn migrate_legacy_job(legacy: &LegacyScheduledJob) -> ScheduledJob {
266 let now_ms = Utc::now().timestamp_millis();
267
268 ScheduledJob {
269 id: legacy.id.clone(),
271 agent_id: None,
272 name: legacy.id.clone(),
274 description: None,
275 enabled: !legacy.paused,
277 delete_after_run: false,
278 created_at_ms: now_ms,
279 updated_at_ms: now_ms,
280 schedule: ScheduleType::from_legacy_cron(&legacy.cron),
282 session_target: SessionTarget::Main,
283 wake_mode: WakeMode::Now,
284 payload: CronPayload::from_legacy_recipe(&legacy.source),
286 isolation: None,
287 delivery: None,
288 state: JobState {
290 last_run_at_ms: legacy.last_run.map(|dt| dt.timestamp_millis()),
291 ..Default::default()
292 },
293 source: Some(legacy.source.clone()),
295 cron: Some(legacy.cron.clone()),
296 }
297}
298
299pub fn migrate_storage_file(path: impl AsRef<Path>) -> io::Result<StorageFile> {
326 let path = path.as_ref();
327 let content = fs::read_to_string(path)?;
328
329 migrate_storage_from_str(&content)
330}
331
332pub fn migrate_storage_from_str(content: &str) -> io::Result<StorageFile> {
341 let version = detect_version_from_str(content)?;
342
343 match version {
344 StorageVersion::Current => {
345 let storage: StorageFile = serde_json::from_str(content)
347 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
348 Ok(storage)
349 }
350 StorageVersion::Legacy => {
351 let legacy: LegacyStorageFile = serde_json::from_str(content)
353 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
354
355 let jobs: Vec<ScheduledJob> = legacy.jobs.iter().map(migrate_legacy_job).collect();
356
357 Ok(StorageFile {
358 version: CURRENT_VERSION,
359 jobs,
360 })
361 }
362 StorageVersion::Unknown(v) => Err(io::Error::new(
363 io::ErrorKind::InvalidData,
364 format!("不支持的存储文件版本: {}", v),
365 )),
366 }
367}
368
369pub fn save_storage_file(path: impl AsRef<Path>, storage: &StorageFile) -> io::Result<()> {
381 let content = serde_json::to_string_pretty(storage)
382 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
383 fs::write(path, content)
384}
385
386pub fn migrate_and_save(path: impl AsRef<Path>) -> io::Result<usize> {
406 let path = path.as_ref();
407
408 if !needs_migration(path)? {
410 return Ok(0);
411 }
412
413 let storage = migrate_storage_file(path)?;
415 let count = storage.jobs.len();
416
417 save_storage_file(path, &storage)?;
419
420 Ok(count)
421}
422
423#[cfg(test)]
428mod tests {
429 use super::*;
430
431 #[test]
436 fn test_legacy_job_deserialize_minimal() {
437 let json = r#"{
438 "id": "test-job",
439 "cron": "0 0 9 * * *",
440 "source": "/path/to/recipe.md"
441 }"#;
442
443 let legacy: LegacyScheduledJob = serde_json::from_str(json).unwrap();
444
445 assert_eq!(legacy.id, "test-job");
446 assert_eq!(legacy.cron, "0 0 9 * * *");
447 assert_eq!(legacy.source, "/path/to/recipe.md");
448 assert!(!legacy.paused); assert!(legacy.last_run.is_none());
450 }
451
452 #[test]
453 fn test_legacy_job_deserialize_full() {
454 let json = r#"{
455 "id": "daily-report",
456 "cron": "0 30 8 * * *",
457 "source": "/home/user/recipes/report.md",
458 "paused": true,
459 "lastRun": "2024-01-15T09:00:00Z"
460 }"#;
461
462 let legacy: LegacyScheduledJob = serde_json::from_str(json).unwrap();
463
464 assert_eq!(legacy.id, "daily-report");
465 assert_eq!(legacy.cron, "0 30 8 * * *");
466 assert_eq!(legacy.source, "/home/user/recipes/report.md");
467 assert!(legacy.paused);
468 assert!(legacy.last_run.is_some());
469 }
470
471 #[test]
472 fn test_legacy_job_serialize() {
473 let legacy = LegacyScheduledJob {
474 id: "test-job".to_string(),
475 cron: "0 0 9 * * *".to_string(),
476 source: "/path/to/recipe.md".to_string(),
477 paused: false,
478 last_run: None,
479 };
480
481 let json = serde_json::to_string(&legacy).unwrap();
482
483 assert!(json.contains("\"id\":\"test-job\""));
484 assert!(json.contains("\"cron\":\"0 0 9 * * *\""));
485 assert!(json.contains("\"source\":\"/path/to/recipe.md\""));
486 }
487
488 #[test]
493 fn test_detect_version_current() {
494 let json = r#"{"version": 2, "jobs": []}"#;
495 let version = detect_version_from_str(json).unwrap();
496 assert_eq!(version, StorageVersion::Current);
497 }
498
499 #[test]
500 fn test_detect_version_legacy_explicit() {
501 let json = r#"{"version": 1, "jobs": []}"#;
502 let version = detect_version_from_str(json).unwrap();
503 assert_eq!(version, StorageVersion::Legacy);
504 }
505
506 #[test]
507 fn test_detect_version_legacy_no_version() {
508 let json = r#"{"jobs": []}"#;
509 let version = detect_version_from_str(json).unwrap();
510 assert_eq!(version, StorageVersion::Legacy);
511 }
512
513 #[test]
514 fn test_detect_version_unknown() {
515 let json = r#"{"version": 99, "jobs": []}"#;
516 let version = detect_version_from_str(json).unwrap();
517 assert_eq!(version, StorageVersion::Unknown(99));
518 }
519
520 #[test]
525 fn test_migrate_legacy_job_basic() {
526 let legacy = LegacyScheduledJob {
527 id: "daily-report".to_string(),
528 cron: "0 0 9 * * *".to_string(),
529 source: "/path/to/recipe.md".to_string(),
530 paused: false,
531 last_run: None,
532 };
533
534 let job = migrate_legacy_job(&legacy);
535
536 assert_eq!(job.id, "daily-report");
538 assert_eq!(job.name, "daily-report");
540 assert!(job.enabled);
542 match &job.schedule {
544 ScheduleType::Cron { expr, tz } => {
545 assert_eq!(expr, "0 0 9 * * *");
546 assert!(tz.is_none());
547 }
548 _ => panic!("Expected Cron schedule type"),
549 }
550 assert_eq!(job.source, Some("/path/to/recipe.md".to_string()));
552 assert_eq!(job.cron, Some("0 0 9 * * *".to_string()));
553 }
554
555 #[test]
556 fn test_migrate_legacy_job_paused() {
557 let legacy = LegacyScheduledJob {
558 id: "paused-job".to_string(),
559 cron: "0 0 12 * * *".to_string(),
560 source: "/path/to/recipe.md".to_string(),
561 paused: true,
562 last_run: None,
563 };
564
565 let job = migrate_legacy_job(&legacy);
566
567 assert!(!job.enabled);
569 }
570
571 #[test]
572 fn test_migrate_legacy_job_with_last_run() {
573 let last_run = Utc::now() - chrono::Duration::hours(1);
574 let legacy = LegacyScheduledJob {
575 id: "job-with-history".to_string(),
576 cron: "0 0 9 * * *".to_string(),
577 source: "/path/to/recipe.md".to_string(),
578 paused: false,
579 last_run: Some(last_run),
580 };
581
582 let job = migrate_legacy_job(&legacy);
583
584 assert_eq!(job.state.last_run_at_ms, Some(last_run.timestamp_millis()));
586 }
587
588 #[test]
589 fn test_migrate_legacy_job_default_values() {
590 let legacy = LegacyScheduledJob {
591 id: "test".to_string(),
592 cron: "0 0 9 * * *".to_string(),
593 source: "/path/to/recipe.md".to_string(),
594 paused: false,
595 last_run: None,
596 };
597
598 let job = migrate_legacy_job(&legacy);
599
600 assert!(job.agent_id.is_none());
602 assert!(job.description.is_none());
603 assert!(!job.delete_after_run);
604 assert_eq!(job.session_target, SessionTarget::Main);
605 assert_eq!(job.wake_mode, WakeMode::Now);
606 assert!(job.isolation.is_none());
607 assert!(job.delivery.is_none());
608 }
609
610 #[test]
611 fn test_migrate_legacy_job_payload() {
612 let legacy = LegacyScheduledJob {
613 id: "test".to_string(),
614 cron: "0 0 9 * * *".to_string(),
615 source: "/home/user/recipes/daily.md".to_string(),
616 paused: false,
617 last_run: None,
618 };
619
620 let job = migrate_legacy_job(&legacy);
621
622 match &job.payload {
624 CronPayload::AgentTurn { message, .. } => {
625 assert_eq!(message, "/home/user/recipes/daily.md");
626 }
627 _ => panic!("Expected AgentTurn payload"),
628 }
629 }
630
631 #[test]
636 fn test_migrate_storage_current_version() {
637 let json = r#"{
638 "version": 2,
639 "jobs": [
640 {
641 "id": "test-job",
642 "name": "Test Job",
643 "enabled": true,
644 "deleteAfterRun": false,
645 "createdAtMs": 1704067200000,
646 "updatedAtMs": 1704067200000,
647 "schedule": {
648 "kind": "cron",
649 "expr": "0 0 9 * * *"
650 },
651 "payload": {
652 "kind": "agentTurn",
653 "message": "Do something"
654 },
655 "state": {}
656 }
657 ]
658 }"#;
659
660 let storage = migrate_storage_from_str(json).unwrap();
661
662 assert_eq!(storage.version, CURRENT_VERSION);
663 assert_eq!(storage.jobs.len(), 1);
664 assert_eq!(storage.jobs[0].id, "test-job");
665 assert_eq!(storage.jobs[0].name, "Test Job");
666 }
667
668 #[test]
669 fn test_migrate_storage_legacy_version() {
670 let json = r#"{
671 "version": 1,
672 "jobs": [
673 {
674 "id": "legacy-job",
675 "cron": "0 0 9 * * *",
676 "source": "/path/to/recipe.md",
677 "paused": false
678 }
679 ]
680 }"#;
681
682 let storage = migrate_storage_from_str(json).unwrap();
683
684 assert_eq!(storage.version, CURRENT_VERSION);
685 assert_eq!(storage.jobs.len(), 1);
686 assert_eq!(storage.jobs[0].id, "legacy-job");
687 assert_eq!(storage.jobs[0].name, "legacy-job");
688 assert!(storage.jobs[0].enabled);
689 }
690
691 #[test]
692 fn test_migrate_storage_no_version() {
693 let json = r#"{
694 "jobs": [
695 {
696 "id": "old-job",
697 "cron": "0 30 8 * * *",
698 "source": "/path/to/old-recipe.md",
699 "paused": true
700 }
701 ]
702 }"#;
703
704 let storage = migrate_storage_from_str(json).unwrap();
705
706 assert_eq!(storage.version, CURRENT_VERSION);
707 assert_eq!(storage.jobs.len(), 1);
708 assert_eq!(storage.jobs[0].id, "old-job");
709 assert!(!storage.jobs[0].enabled); }
711
712 #[test]
713 fn test_migrate_storage_multiple_jobs() {
714 let json = r#"{
715 "jobs": [
716 {
717 "id": "job-1",
718 "cron": "0 0 9 * * *",
719 "source": "/path/to/recipe1.md",
720 "paused": false
721 },
722 {
723 "id": "job-2",
724 "cron": "0 0 18 * * *",
725 "source": "/path/to/recipe2.md",
726 "paused": true
727 }
728 ]
729 }"#;
730
731 let storage = migrate_storage_from_str(json).unwrap();
732
733 assert_eq!(storage.version, CURRENT_VERSION);
734 assert_eq!(storage.jobs.len(), 2);
735
736 assert_eq!(storage.jobs[0].id, "job-1");
737 assert!(storage.jobs[0].enabled);
738
739 assert_eq!(storage.jobs[1].id, "job-2");
740 assert!(!storage.jobs[1].enabled);
741 }
742
743 #[test]
744 fn test_migrate_storage_unknown_version() {
745 let json = r#"{"version": 99, "jobs": []}"#;
746
747 let result = migrate_storage_from_str(json);
748
749 assert!(result.is_err());
750 let err = result.unwrap_err();
751 assert!(err.to_string().contains("不支持的存储文件版本"));
752 }
753
754 #[test]
755 fn test_migrate_storage_empty_jobs() {
756 let json = r#"{"jobs": []}"#;
757
758 let storage = migrate_storage_from_str(json).unwrap();
759
760 assert_eq!(storage.version, CURRENT_VERSION);
761 assert!(storage.jobs.is_empty());
762 }
763
764 #[test]
769 fn test_storage_file_default() {
770 let storage = StorageFile::default();
771
772 assert_eq!(storage.version, CURRENT_VERSION);
773 assert!(storage.jobs.is_empty());
774 }
775
776 #[test]
777 fn test_storage_file_serialize() {
778 let storage = StorageFile {
779 version: CURRENT_VERSION,
780 jobs: vec![],
781 };
782
783 let json = serde_json::to_string(&storage).unwrap();
784
785 assert!(json.contains(&format!("\"version\":{}", CURRENT_VERSION)));
786 assert!(json.contains("\"jobs\":[]"));
787 }
788}
789
790#[cfg(test)]
795mod property_tests {
796 use super::*;
797 use proptest::prelude::*;
798
799 fn arb_job_id() -> impl Strategy<Value = String> {
807 "[a-z][a-z0-9-]{0,30}".prop_filter("非空 ID", |s| !s.is_empty())
808 }
809
810 fn arb_valid_cron_expr() -> impl Strategy<Value = String> {
814 prop_oneof![
815 Just("0 0 9 * * *".to_string()), Just("0 30 8 * * *".to_string()), Just("0 0 12 * * *".to_string()), Just("0 */5 * * * *".to_string()), Just("0 0 0 * * 1".to_string()), Just("0 0 18 * * *".to_string()), Just("0 15 10 * * *".to_string()), Just("0 0 */2 * * *".to_string()), ]
824 }
825
826 fn arb_source_path() -> impl Strategy<Value = String> {
828 prop_oneof![
829 Just("/path/to/recipe.md".to_string()),
830 Just("/home/user/recipes/daily.md".to_string()),
831 Just("recipes/report.md".to_string()),
832 Just("/var/aster/tasks/backup.md".to_string()),
833 "[a-z/]{5,50}\\.md".prop_filter("有效路径", |s| !s.is_empty()),
834 ]
835 }
836
837 fn arb_last_run() -> impl Strategy<Value = Option<DateTime<Utc>>> {
839 prop_oneof![
840 Just(None),
841 (1i64..2592000i64)
843 .prop_map(|secs| { Some(Utc::now() - chrono::Duration::seconds(secs)) }),
844 ]
845 }
846
847 fn arb_legacy_job() -> impl Strategy<Value = LegacyScheduledJob> {
849 (
850 arb_job_id(),
851 arb_valid_cron_expr(),
852 arb_source_path(),
853 proptest::bool::ANY,
854 arb_last_run(),
855 )
856 .prop_map(|(id, cron, source, paused, last_run)| LegacyScheduledJob {
857 id,
858 cron,
859 source,
860 paused,
861 last_run,
862 })
863 }
864
865 fn arb_legacy_jobs() -> impl Strategy<Value = Vec<LegacyScheduledJob>> {
867 prop::collection::vec(arb_legacy_job(), 0..10)
868 }
869
870 proptest! {
875 #![proptest_config(ProptestConfig::with_cases(100))]
876
877 #[test]
884 fn prop_migration_schedule_is_cron(legacy in arb_legacy_job()) {
885 let job = migrate_legacy_job(&legacy);
886
887 match &job.schedule {
888 ScheduleType::Cron { expr, tz } => {
889 prop_assert_eq!(expr, &legacy.cron);
891 prop_assert!(tz.is_none());
893 }
894 _ => prop_assert!(false, "迁移后 schedule 应为 Cron 类型"),
895 }
896 }
897
898 #[test]
904 fn prop_migration_preserves_job_id(legacy in arb_legacy_job()) {
905 let job = migrate_legacy_job(&legacy);
906
907 prop_assert_eq!(
908 &job.id,
909 &legacy.id,
910 "迁移后 job ID 应保持不变"
911 );
912 }
913
914 #[test]
920 fn prop_migration_applies_default_values(legacy in arb_legacy_job()) {
921 let job = migrate_legacy_job(&legacy);
922
923 prop_assert!(job.agent_id.is_none(), "agent_id 应为 None");
925 prop_assert!(job.description.is_none(), "description 应为 None");
926 prop_assert!(!job.delete_after_run, "delete_after_run 应为 false");
927 prop_assert_eq!(job.session_target, SessionTarget::Main, "session_target 应为 Main");
928 prop_assert_eq!(job.wake_mode, WakeMode::Now, "wake_mode 应为 Now");
929 prop_assert!(job.isolation.is_none(), "isolation 应为 None");
930 prop_assert!(job.delivery.is_none(), "delivery 应为 None");
931
932 prop_assert!(job.created_at_ms > 0, "created_at_ms 应大于 0");
934 prop_assert!(job.updated_at_ms > 0, "updated_at_ms 应大于 0");
935 }
936
937 #[test]
943 fn prop_migration_enabled_inverse_of_paused(legacy in arb_legacy_job()) {
944 let job = migrate_legacy_job(&legacy);
945
946 prop_assert_eq!(
947 job.enabled,
948 !legacy.paused,
949 "enabled 应与 paused 相反"
950 );
951 }
952
953 #[test]
959 fn prop_migration_name_uses_id(legacy in arb_legacy_job()) {
960 let job = migrate_legacy_job(&legacy);
961
962 prop_assert_eq!(
963 &job.name,
964 &legacy.id,
965 "name 应使用原始 ID"
966 );
967 }
968
969 #[test]
975 fn prop_migration_preserves_legacy_fields(legacy in arb_legacy_job()) {
976 let job = migrate_legacy_job(&legacy);
977
978 prop_assert_eq!(
979 job.source,
980 Some(legacy.source.clone()),
981 "source 字段应保留"
982 );
983 prop_assert_eq!(
984 job.cron,
985 Some(legacy.cron.clone()),
986 "cron 字段应保留"
987 );
988 }
989
990 #[test]
996 fn prop_migration_payload_is_agent_turn(legacy in arb_legacy_job()) {
997 let job = migrate_legacy_job(&legacy);
998
999 prop_assert!(
1000 job.payload.is_agent_turn(),
1001 "payload 应为 AgentTurn 类型"
1002 );
1003 prop_assert_eq!(
1004 job.payload.get_text(),
1005 &legacy.source,
1006 "payload message 应为原始 source"
1007 );
1008 }
1009
1010 #[test]
1016 fn prop_migration_last_run_converted(legacy in arb_legacy_job()) {
1017 let job = migrate_legacy_job(&legacy);
1018
1019 match legacy.last_run {
1020 Some(last_run) => {
1021 prop_assert_eq!(
1022 job.state.last_run_at_ms,
1023 Some(last_run.timestamp_millis()),
1024 "last_run 应正确转换为毫秒时间戳"
1025 );
1026 }
1027 None => {
1028 prop_assert!(
1029 job.state.last_run_at_ms.is_none(),
1030 "无 last_run 时 state.last_run_at_ms 应为 None"
1031 );
1032 }
1033 }
1034 }
1035
1036 #[test]
1042 fn prop_migration_preserves_all_jobs(jobs in arb_legacy_jobs()) {
1043 let legacy_storage = LegacyStorageFile {
1044 version: Some(LEGACY_VERSION),
1045 jobs: jobs.clone(),
1046 };
1047
1048 let json = serde_json::to_string(&legacy_storage).unwrap();
1049 let migrated = migrate_storage_from_str(&json).unwrap();
1050
1051 prop_assert_eq!(
1052 migrated.jobs.len(),
1053 jobs.len(),
1054 "迁移后任务数量应保持不变"
1055 );
1056
1057 for (i, legacy_job) in jobs.iter().enumerate() {
1059 prop_assert_eq!(
1060 &migrated.jobs[i].id,
1061 &legacy_job.id,
1062 "任务 {} 的 ID 应保持不变",
1063 i
1064 );
1065 }
1066 }
1067
1068 #[test]
1074 fn prop_migration_updates_version(jobs in arb_legacy_jobs()) {
1075 let legacy_storage = LegacyStorageFile {
1076 version: Some(LEGACY_VERSION),
1077 jobs,
1078 };
1079
1080 let json = serde_json::to_string(&legacy_storage).unwrap();
1081 let migrated = migrate_storage_from_str(&json).unwrap();
1082
1083 prop_assert_eq!(
1084 migrated.version,
1085 CURRENT_VERSION,
1086 "迁移后版本号应为 CURRENT_VERSION"
1087 );
1088 }
1089
1090 #[test]
1096 fn prop_migration_handles_no_version(jobs in arb_legacy_jobs()) {
1097 let legacy_storage = LegacyStorageFile {
1098 version: None, jobs: jobs.clone(),
1100 };
1101
1102 let json = serde_json::to_string(&legacy_storage).unwrap();
1103 let migrated = migrate_storage_from_str(&json).unwrap();
1104
1105 prop_assert_eq!(
1106 migrated.version,
1107 CURRENT_VERSION,
1108 "无版本字段时迁移后版本号应为 CURRENT_VERSION"
1109 );
1110 prop_assert_eq!(
1111 migrated.jobs.len(),
1112 jobs.len(),
1113 "无版本字段时任务数量应保持不变"
1114 );
1115 }
1116 }
1117}