1use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use std::time::SystemTime;
21use tokio::sync::RwLock;
22
23use crate::error::StreamError;
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27pub enum SourcePlatform {
28 KafkaStreams,
30 Flink,
32 SparkStreaming,
34 Storm,
36 PulsarFunctions,
38 KinesisAnalytics,
40 Dataflow,
42 Beam,
44 Custom(String),
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct MigrationConfig {
51 pub source_platform: SourcePlatform,
53 pub source_dir: PathBuf,
55 pub output_dir: PathBuf,
57 pub generate_wrappers: bool,
59 pub preserve_comments: bool,
61 pub generate_tests: bool,
63 pub rust_edition: String,
65 pub extra_dependencies: Vec<String>,
67}
68
69impl Default for MigrationConfig {
70 fn default() -> Self {
71 Self {
72 source_platform: SourcePlatform::KafkaStreams,
73 source_dir: PathBuf::from("./source"),
74 output_dir: PathBuf::from("./migrated"),
75 generate_wrappers: true,
76 preserve_comments: true,
77 generate_tests: true,
78 rust_edition: "2021".to_string(),
79 extra_dependencies: Vec::new(),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MigrationReport {
87 pub report_id: String,
89 pub source_platform: SourcePlatform,
91 pub timestamp: SystemTime,
93 pub files_processed: usize,
95 pub lines_converted: usize,
97 pub successful: usize,
99 pub failed: usize,
101 pub warnings: Vec<MigrationWarning>,
103 pub errors: Vec<MigrationError>,
105 pub generated_files: Vec<GeneratedFile>,
107 pub manual_review_items: Vec<ManualReviewItem>,
109 pub suggestions: Vec<MigrationSuggestion>,
111 pub compatibility_score: f64,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct MigrationWarning {
118 pub code: String,
120 pub message: String,
122 pub file: Option<PathBuf>,
124 pub line: Option<usize>,
126 pub suggestion: Option<String>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct MigrationError {
133 pub code: String,
135 pub message: String,
137 pub file: Option<PathBuf>,
139 pub line: Option<usize>,
141 pub recoverable: bool,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct GeneratedFile {
148 pub path: PathBuf,
150 pub file_type: GeneratedFileType,
152 pub lines: usize,
154 pub source_file: Option<PathBuf>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum GeneratedFileType {
161 Source,
163 Wrapper,
165 Test,
167 Config,
169 Documentation,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct ManualReviewItem {
176 pub id: String,
178 pub description: String,
180 pub file: PathBuf,
182 pub line_range: (usize, usize),
184 pub priority: ReviewPriority,
186 pub reason: String,
188 pub suggestion: String,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)]
194pub enum ReviewPriority {
195 Low,
197 Medium,
199 High,
201 Critical,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct MigrationSuggestion {
208 pub category: SuggestionCategory,
210 pub title: String,
212 pub description: String,
214 pub example: Option<String>,
216 pub references: Vec<String>,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
222pub enum SuggestionCategory {
223 Performance,
225 CodeStyle,
227 BestPractice,
229 Security,
231 RustIdiom,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct ConceptMapping {
238 pub source_name: String,
240 pub target_name: String,
242 pub description: String,
244 pub pattern: Option<String>,
246 pub source_example: Option<String>,
248 pub target_example: Option<String>,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct APIMapping {
255 pub source_api: String,
257 pub target_api: String,
259 pub param_mappings: HashMap<String, String>,
261 pub return_type_mapping: Option<String>,
263 pub notes: String,
265}
266
267pub struct MigrationTool {
269 config: MigrationConfig,
271 concept_mappings: Arc<RwLock<Vec<ConceptMapping>>>,
273 api_mappings: Arc<RwLock<Vec<APIMapping>>>,
275 stats: Arc<RwLock<MigrationStats>>,
277}
278
279#[derive(Debug, Clone, Default, Serialize, Deserialize)]
281pub struct MigrationStats {
282 pub total_migrations: u64,
284 pub successful_migrations: u64,
286 pub avg_compatibility_score: f64,
288 pub total_lines_converted: u64,
290 pub total_files_processed: u64,
292}
293
294impl MigrationTool {
295 pub fn new(config: MigrationConfig) -> Self {
298 Self {
299 config: config.clone(),
300 concept_mappings: Arc::new(RwLock::new(Vec::new())),
301 api_mappings: Arc::new(RwLock::new(Vec::new())),
302 stats: Arc::new(RwLock::new(MigrationStats::default())),
303 }
304 }
305
306 pub async fn analyze(&self) -> Result<MigrationReport, StreamError> {
308 let mut report = MigrationReport {
309 report_id: uuid::Uuid::new_v4().to_string(),
310 source_platform: self.config.source_platform.clone(),
311 timestamp: SystemTime::now(),
312 files_processed: 0,
313 lines_converted: 0,
314 successful: 0,
315 failed: 0,
316 warnings: Vec::new(),
317 errors: Vec::new(),
318 generated_files: Vec::new(),
319 manual_review_items: Vec::new(),
320 suggestions: Vec::new(),
321 compatibility_score: 0.0,
322 };
323
324 let source_files = self.scan_source_directory().await?;
326 report.files_processed = source_files.len();
327
328 for file_path in &source_files {
330 match self.analyze_file(file_path).await {
331 Ok(analysis) => {
332 report.successful += 1;
333 report.lines_converted += analysis.lines;
334 report.warnings.extend(analysis.warnings);
335 report.manual_review_items.extend(analysis.review_items);
336 }
337 Err(e) => {
338 report.failed += 1;
339 report.errors.push(MigrationError {
340 code: "ANALYSIS_ERROR".to_string(),
341 message: e.to_string(),
342 file: Some(file_path.to_path_buf()),
343 line: None,
344 recoverable: false,
345 });
346 }
347 }
348 }
349
350 report.suggestions = self.generate_suggestions(&report).await;
352
353 report.compatibility_score = self.calculate_compatibility_score(&report);
355
356 Ok(report)
357 }
358
359 pub async fn migrate(&self) -> Result<MigrationReport, StreamError> {
361 let mut report = self.analyze().await?;
362
363 if !self.config.output_dir.exists() {
365 std::fs::create_dir_all(&self.config.output_dir).map_err(|e| {
366 StreamError::Io(format!("Failed to create output directory: {}", e))
367 })?;
368 }
369
370 let cargo_toml = self.generate_cargo_toml().await;
372 let cargo_path = self.config.output_dir.join("Cargo.toml");
373 std::fs::write(&cargo_path, cargo_toml)
374 .map_err(|e| StreamError::Io(format!("Failed to write Cargo.toml: {}", e)))?;
375
376 report.generated_files.push(GeneratedFile {
377 path: cargo_path,
378 file_type: GeneratedFileType::Config,
379 lines: 30,
380 source_file: None,
381 });
382
383 let lib_rs = self.generate_lib_rs().await;
385 let lib_path = self.config.output_dir.join("src").join("lib.rs");
386 std::fs::create_dir_all(self.config.output_dir.join("src")).ok();
387 std::fs::write(&lib_path, lib_rs)
388 .map_err(|e| StreamError::Io(format!("Failed to write lib.rs: {}", e)))?;
389
390 report.generated_files.push(GeneratedFile {
391 path: lib_path,
392 file_type: GeneratedFileType::Source,
393 lines: 50,
394 source_file: None,
395 });
396
397 if self.config.generate_wrappers {
399 let wrapper = self.generate_compatibility_wrapper().await;
400 let wrapper_path = self.config.output_dir.join("src").join("compat.rs");
401 std::fs::write(&wrapper_path, wrapper)
402 .map_err(|e| StreamError::Io(format!("Failed to write compat.rs: {}", e)))?;
403
404 report.generated_files.push(GeneratedFile {
405 path: wrapper_path,
406 file_type: GeneratedFileType::Wrapper,
407 lines: 200,
408 source_file: None,
409 });
410 }
411
412 if self.config.generate_tests {
414 let tests = self.generate_tests().await;
415 let test_path = self.config.output_dir.join("tests").join("integration.rs");
416 std::fs::create_dir_all(self.config.output_dir.join("tests")).ok();
417 std::fs::write(&test_path, tests)
418 .map_err(|e| StreamError::Io(format!("Failed to write tests: {}", e)))?;
419
420 report.generated_files.push(GeneratedFile {
421 path: test_path,
422 file_type: GeneratedFileType::Test,
423 lines: 100,
424 source_file: None,
425 });
426 }
427
428 let mut stats = self.stats.write().await;
430 stats.total_migrations += 1;
431 stats.successful_migrations += 1;
432 stats.total_files_processed += report.files_processed as u64;
433 stats.total_lines_converted += report.lines_converted as u64;
434 stats.avg_compatibility_score = (stats.avg_compatibility_score
435 * (stats.total_migrations - 1) as f64
436 + report.compatibility_score)
437 / stats.total_migrations as f64;
438
439 Ok(report)
440 }
441
442 pub async fn get_concept_mappings(&self) -> Vec<ConceptMapping> {
444 self.concept_mappings.read().await.clone()
445 }
446
447 pub async fn get_api_mappings(&self) -> Vec<APIMapping> {
449 self.api_mappings.read().await.clone()
450 }
451
452 pub async fn add_concept_mapping(&self, mapping: ConceptMapping) {
454 let mut mappings = self.concept_mappings.write().await;
455 mappings.push(mapping);
456 }
457
458 pub async fn add_api_mapping(&self, mapping: APIMapping) {
460 let mut mappings = self.api_mappings.write().await;
461 mappings.push(mapping);
462 }
463
464 pub async fn get_stats(&self) -> MigrationStats {
466 self.stats.read().await.clone()
467 }
468
469 pub async fn generate_guide(&self) -> String {
471 let mut guide = String::new();
472
473 guide.push_str(&format!(
474 "# Migration Guide: {} to oxirs-stream\n\n",
475 self.platform_name()
476 ));
477
478 guide.push_str("## Overview\n\n");
479 guide.push_str(
480 "This guide helps you migrate your streaming application to oxirs-stream.\n\n",
481 );
482
483 guide.push_str("## Key Concepts\n\n");
484
485 let mappings = self.concept_mappings.read().await;
486 for mapping in mappings.iter() {
487 guide.push_str(&format!(
488 "### {} → {}\n\n{}\n\n",
489 mapping.source_name, mapping.target_name, mapping.description
490 ));
491
492 if let Some(ref source) = mapping.source_example {
493 guide.push_str("**Before:**\n```\n");
494 guide.push_str(source);
495 guide.push_str("\n```\n\n");
496 }
497
498 if let Some(ref target) = mapping.target_example {
499 guide.push_str("**After:**\n```rust\n");
500 guide.push_str(target);
501 guide.push_str("\n```\n\n");
502 }
503 }
504
505 guide.push_str("## API Reference\n\n");
506 guide.push_str("| Source API | oxirs-stream API | Notes |\n");
507 guide.push_str("|------------|------------------|-------|\n");
508
509 let api_mappings = self.api_mappings.read().await;
510 for mapping in api_mappings.iter() {
511 guide.push_str(&format!(
512 "| `{}` | `{}` | {} |\n",
513 mapping.source_api, mapping.target_api, mapping.notes
514 ));
515 }
516
517 guide.push_str("\n## Next Steps\n\n");
518 guide.push_str("1. Review the generated code\n");
519 guide.push_str("2. Address manual review items\n");
520 guide.push_str("3. Run the test suite\n");
521 guide.push_str("4. Benchmark performance\n");
522 guide.push_str("5. Deploy gradually with feature flags\n");
523
524 guide
525 }
526
527 async fn load_default_mappings(&mut self) {
530 let mut concept_mappings = self.concept_mappings.write().await;
531 let mut api_mappings = self.api_mappings.write().await;
532
533 match self.config.source_platform {
534 SourcePlatform::KafkaStreams => {
535 concept_mappings.push(ConceptMapping {
537 source_name: "KStream".to_string(),
538 target_name: "Stream".to_string(),
539 description: "Unbounded stream of records".to_string(),
540 pattern: Some("stream!".to_string()),
541 source_example: Some("KStream<String, String> stream = builder.stream(\"topic\");".to_string()),
542 target_example: Some("let stream = StreamBuilder::new()\n .source(KafkaSource::new(\"topic\"))\n .build();".to_string()),
543 });
544
545 concept_mappings.push(ConceptMapping {
546 source_name: "KTable".to_string(),
547 target_name: "StateStore".to_string(),
548 description: "Changelog stream / table".to_string(),
549 pattern: None,
550 source_example: Some(
551 "KTable<String, Long> table = builder.table(\"topic\");".to_string(),
552 ),
553 target_example: Some(
554 "let state = StateStore::new(\"table\")\n .with_changelog(\"topic\");"
555 .to_string(),
556 ),
557 });
558
559 api_mappings.push(APIMapping {
561 source_api: "stream.mapValues()".to_string(),
562 target_api: "stream.map()".to_string(),
563 param_mappings: HashMap::new(),
564 return_type_mapping: None,
565 notes: "Use map with tuple destructuring".to_string(),
566 });
567
568 api_mappings.push(APIMapping {
569 source_api: "stream.filter()".to_string(),
570 target_api: "stream.filter()".to_string(),
571 param_mappings: HashMap::new(),
572 return_type_mapping: None,
573 notes: "Direct equivalent".to_string(),
574 });
575
576 api_mappings.push(APIMapping {
577 source_api: "stream.groupByKey()".to_string(),
578 target_api: "stream.group_by_key()".to_string(),
579 param_mappings: HashMap::new(),
580 return_type_mapping: None,
581 notes: "Similar semantics".to_string(),
582 });
583
584 api_mappings.push(APIMapping {
585 source_api: "stream.windowedBy()".to_string(),
586 target_api: "stream.window()".to_string(),
587 param_mappings: {
588 let mut map = HashMap::new();
589 map.insert(
590 "TimeWindows.of()".to_string(),
591 "TumblingWindow::new()".to_string(),
592 );
593 map.insert(
594 "SlidingWindows.of()".to_string(),
595 "SlidingWindow::new()".to_string(),
596 );
597 map
598 },
599 return_type_mapping: None,
600 notes: "Window types map directly".to_string(),
601 });
602 }
603
604 SourcePlatform::Flink => {
605 concept_mappings.push(ConceptMapping {
607 source_name: "DataStream".to_string(),
608 target_name: "Stream".to_string(),
609 description: "Core streaming abstraction".to_string(),
610 pattern: None,
611 source_example: Some(
612 "DataStream<String> stream = env.addSource(source);".to_string(),
613 ),
614 target_example: Some(
615 "let stream = StreamBuilder::new().source(source).build();".to_string(),
616 ),
617 });
618
619 concept_mappings.push(ConceptMapping {
620 source_name: "KeyedStream".to_string(),
621 target_name: "GroupedStream".to_string(),
622 description: "Partitioned stream by key".to_string(),
623 pattern: None,
624 source_example: None,
625 target_example: None,
626 });
627
628 api_mappings.push(APIMapping {
629 source_api: "stream.keyBy()".to_string(),
630 target_api: "stream.key_by()".to_string(),
631 param_mappings: HashMap::new(),
632 return_type_mapping: None,
633 notes: "Use closure for key extraction".to_string(),
634 });
635
636 api_mappings.push(APIMapping {
637 source_api: "stream.process()".to_string(),
638 target_api: "stream.process()".to_string(),
639 param_mappings: HashMap::new(),
640 return_type_mapping: None,
641 notes: "Implement ProcessFunction trait".to_string(),
642 });
643 }
644
645 SourcePlatform::SparkStreaming => {
646 concept_mappings.push(ConceptMapping {
648 source_name: "DStream".to_string(),
649 target_name: "Stream".to_string(),
650 description: "Discretized stream".to_string(),
651 pattern: None,
652 source_example: Some("val stream = ssc.socketTextStream(host, port)".to_string()),
653 target_example: Some("let stream = StreamBuilder::new()\n .source(TcpSource::new(host, port))\n .build();".to_string()),
654 });
655
656 api_mappings.push(APIMapping {
657 source_api: "stream.transform()".to_string(),
658 target_api: "stream.map()".to_string(),
659 param_mappings: HashMap::new(),
660 return_type_mapping: None,
661 notes: "Use map for transformations".to_string(),
662 });
663
664 api_mappings.push(APIMapping {
665 source_api: "stream.foreachRDD()".to_string(),
666 target_api: "stream.for_each()".to_string(),
667 param_mappings: HashMap::new(),
668 return_type_mapping: None,
669 notes: "Processes each micro-batch".to_string(),
670 });
671 }
672
673 _ => {
674 concept_mappings.push(ConceptMapping {
676 source_name: "Stream".to_string(),
677 target_name: "Stream".to_string(),
678 description: "Core streaming abstraction".to_string(),
679 pattern: None,
680 source_example: None,
681 target_example: None,
682 });
683 }
684 }
685 }
686
687 async fn scan_source_directory(&self) -> Result<Vec<PathBuf>, StreamError> {
688 let mut files = Vec::new();
689
690 if !self.config.source_dir.exists() {
691 return Ok(files);
692 }
693
694 let extension = match self.config.source_platform {
695 SourcePlatform::KafkaStreams | SourcePlatform::Flink | SourcePlatform::Storm => "java",
696 SourcePlatform::SparkStreaming => "scala",
697 SourcePlatform::PulsarFunctions => "java",
698 SourcePlatform::KinesisAnalytics | SourcePlatform::Dataflow | SourcePlatform::Beam => {
699 "java"
700 }
701 SourcePlatform::Custom(_) => "java",
702 };
703
704 Self::scan_directory_recursive(&self.config.source_dir, extension, &mut files)?;
705
706 Ok(files)
707 }
708
709 fn scan_directory_recursive(
710 dir: &Path,
711 extension: &str,
712 files: &mut Vec<PathBuf>,
713 ) -> Result<(), StreamError> {
714 if dir.is_dir() {
715 for entry in std::fs::read_dir(dir)
716 .map_err(|e| StreamError::Io(format!("Failed to read directory: {}", e)))?
717 {
718 let entry =
719 entry.map_err(|e| StreamError::Io(format!("Failed to read entry: {}", e)))?;
720 let path = entry.path();
721
722 if path.is_dir() {
723 Self::scan_directory_recursive(&path, extension, files)?;
724 } else if path.extension().map(|e| e == extension).unwrap_or(false) {
725 files.push(path);
726 }
727 }
728 }
729
730 Ok(())
731 }
732
733 async fn analyze_file(&self, file_path: &Path) -> Result<FileAnalysis, StreamError> {
734 Ok(FileAnalysis {
737 lines: 100,
738 warnings: vec![MigrationWarning {
739 code: "DEPRECATED_API".to_string(),
740 message: "Some APIs may need manual review".to_string(),
741 file: Some(file_path.to_path_buf()),
742 line: None,
743 suggestion: Some("Check API mappings".to_string()),
744 }],
745 review_items: vec![],
746 })
747 }
748
749 async fn generate_suggestions(&self, report: &MigrationReport) -> Vec<MigrationSuggestion> {
750 let mut suggestions = Vec::new();
751
752 suggestions.push(MigrationSuggestion {
754 category: SuggestionCategory::Performance,
755 title: "Use async/await for I/O operations".to_string(),
756 description: "oxirs-stream is built on Tokio async runtime. Ensure all I/O operations use async methods.".to_string(),
757 example: Some("async fn process(event: Event) -> Result<Output, Error> {\n // Use .await for async operations\n}".to_string()),
758 references: vec!["https://tokio.rs/".to_string()],
759 });
760
761 suggestions.push(MigrationSuggestion {
763 category: SuggestionCategory::BestPractice,
764 title: "Use structured error handling".to_string(),
765 description: "Replace exceptions with Result types for better error propagation.".to_string(),
766 example: Some("fn process() -> Result<(), StreamError> {\n // Return errors instead of throwing\n}".to_string()),
767 references: vec![],
768 });
769
770 if report.files_processed > 0 {
772 suggestions.push(MigrationSuggestion {
773 category: SuggestionCategory::RustIdiom,
774 title: "Use iterators instead of loops".to_string(),
775 description:
776 "Rust iterators are often more performant and idiomatic than explicit loops."
777 .to_string(),
778 example: Some("let sum: i32 = values.iter().map(|x| x * 2).sum();".to_string()),
779 references: vec![],
780 });
781 }
782
783 suggestions
784 }
785
786 fn calculate_compatibility_score(&self, report: &MigrationReport) -> f64 {
787 if report.files_processed == 0 {
788 return 100.0;
789 }
790
791 let success_rate = report.successful as f64 / report.files_processed as f64;
792 let warning_penalty = (report.warnings.len() as f64 * 2.0).min(20.0);
793 let error_penalty = (report.errors.len() as f64 * 5.0).min(50.0);
794 let review_penalty = (report.manual_review_items.len() as f64).min(10.0);
795
796 (success_rate * 100.0 - warning_penalty - error_penalty - review_penalty).max(0.0)
797 }
798
799 async fn generate_cargo_toml(&self) -> String {
800 format!(
801 r#"[package]
802name = "migrated-stream"
803version = "0.1.0"
804edition = "{}"
805
806[dependencies]
807oxirs-stream = "0.1"
808tokio = {{ version = "1", features = ["full"] }}
809serde = {{ version = "1", features = ["derive"] }}
810serde_json = "1"
811{}
812"#,
813 self.config.rust_edition,
814 self.config.extra_dependencies.join("\n")
815 )
816 }
817
818 async fn generate_lib_rs(&self) -> String {
819 format!(
820 r#"//! Migrated streaming application from {}
821//! Generated by oxirs-stream migration tool
822
823{}
824pub mod compat;
825
826pub use oxirs_stream::prelude::*;
827
828// Your migrated stream processors go here
829"#,
830 self.platform_name(),
831 if self.config.generate_wrappers {
832 ""
833 } else {
834 "// Compatibility wrappers disabled\n"
835 }
836 )
837 }
838
839 async fn generate_compatibility_wrapper(&self) -> String {
840 match self.config.source_platform {
841 SourcePlatform::KafkaStreams => r#"//! Compatibility wrappers for Kafka Streams API
842
843use oxirs_stream::prelude::*;
844
845/// KStream-like wrapper for familiar API
846pub struct KStreamCompat<K, V> {
847 inner: Stream<(K, V)>,
848}
849
850impl<K, V> KStreamCompat<K, V>
851where
852 K: Clone + Send + Sync + 'static,
853 V: Clone + Send + Sync + 'static,
854{
855 pub fn new(stream: Stream<(K, V)>) -> Self {
856 Self { inner: stream }
857 }
858
859 pub fn map_values<F, V2>(self, f: F) -> KStreamCompat<K, V2>
860 where
861 F: Fn(V) -> V2 + Send + Sync + Clone + 'static,
862 V2: Clone + Send + Sync + 'static,
863 {
864 KStreamCompat {
865 inner: self.inner.map(move |(k, v)| (k, f(v))),
866 }
867 }
868
869 pub fn filter<F>(self, predicate: F) -> Self
870 where
871 F: Fn(&K, &V) -> bool + Send + Sync + Clone + 'static,
872 {
873 Self {
874 inner: self.inner.filter(move |(k, v)| predicate(k, v)),
875 }
876 }
877}
878
879/// KTable-like wrapper for changelog streams and materialized views
880pub struct KTableCompat<K, V> {
881 store: StateStore<K, V>,
882}
883
884impl<K, V> KTableCompat<K, V>
885where
886 K: Clone + Send + Sync + 'static,
887 V: Clone + Send + Sync + 'static,
888{
889 pub fn new(store: StateStore<K, V>) -> Self {
890 Self { store }
891 }
892
893 pub fn map_values<F, V2>(self, f: F) -> KTableCompat<K, V2>
894 where
895 F: Fn(V) -> V2 + Send + Sync + Clone + 'static,
896 V2: Clone + Send + Sync + 'static,
897 {
898 KTableCompat {
899 store: self.store.map_values(f),
900 }
901 }
902
903 pub fn filter<F>(self, predicate: F) -> Self
904 where
905 F: Fn(&K, &V) -> bool + Send + Sync + Clone + 'static,
906 {
907 Self {
908 store: self.store.filter(predicate),
909 }
910 }
911
912 pub fn group_by<F, K2>(self, key_selector: F) -> KTableCompat<K2, Vec<V>>
913 where
914 K2: Clone + Send + Sync + 'static,
915 F: Fn(&K, &V) -> K2 + Send + Sync + Clone + 'static,
916 {
917 KTableCompat {
918 store: self.store.group_by(key_selector),
919 }
920 }
921}
922"#
923 .to_string(),
924 SourcePlatform::Flink => r#"//! Compatibility wrappers for Flink API
925
926use oxirs_stream::prelude::*;
927
928/// DataStream-like wrapper
929pub struct DataStreamCompat<T> {
930 inner: Stream<T>,
931}
932
933impl<T> DataStreamCompat<T>
934where
935 T: Clone + Send + Sync + 'static,
936{
937 pub fn new(stream: Stream<T>) -> Self {
938 Self { inner: stream }
939 }
940
941 pub fn key_by<K, F>(self, key_selector: F) -> KeyedStreamCompat<K, T>
942 where
943 K: Clone + Send + Sync + 'static,
944 F: Fn(&T) -> K + Send + Sync + Clone + 'static,
945 {
946 KeyedStreamCompat {
947 inner: self.inner.key_by(key_selector),
948 }
949 }
950}
951
952/// KeyedStream-like wrapper
953pub struct KeyedStreamCompat<K, T> {
954 inner: GroupedStream<K, T>,
955}
956"#
957 .to_string(),
958 _ => r#"//! Generic compatibility wrappers
959
960use oxirs_stream::prelude::*;
961
962// Add platform-specific wrappers as needed
963"#
964 .to_string(),
965 }
966 }
967
968 async fn generate_tests(&self) -> String {
969 r#"//! Integration tests for migrated application
970
971use oxirs_stream::prelude::*;
972
973#[tokio::test]
974async fn test_basic_stream() {
975 // Add your tests here
976 assert!(true);
977}
978
979#[tokio::test]
980async fn test_window_operations() {
981 // Test window operations
982 assert!(true);
983}
984
985#[tokio::test]
986async fn test_aggregations() {
987 // Test aggregations
988 assert!(true);
989}
990"#
991 .to_string()
992 }
993
994 fn platform_name(&self) -> String {
995 match &self.config.source_platform {
996 SourcePlatform::KafkaStreams => "Kafka Streams".to_string(),
997 SourcePlatform::Flink => "Apache Flink".to_string(),
998 SourcePlatform::SparkStreaming => "Spark Streaming".to_string(),
999 SourcePlatform::Storm => "Apache Storm".to_string(),
1000 SourcePlatform::PulsarFunctions => "Pulsar Functions".to_string(),
1001 SourcePlatform::KinesisAnalytics => "Kinesis Analytics".to_string(),
1002 SourcePlatform::Dataflow => "Google Dataflow".to_string(),
1003 SourcePlatform::Beam => "Apache Beam".to_string(),
1004 SourcePlatform::Custom(name) => name.clone(),
1005 }
1006 }
1007}
1008
1009struct FileAnalysis {
1011 lines: usize,
1012 warnings: Vec<MigrationWarning>,
1013 review_items: Vec<ManualReviewItem>,
1014}
1015
1016pub struct QuickStart;
1018
1019impl QuickStart {
1020 pub fn from_kafka_streams(source_dir: &str, output_dir: &str) -> MigrationTool {
1022 MigrationTool::new(MigrationConfig {
1023 source_platform: SourcePlatform::KafkaStreams,
1024 source_dir: PathBuf::from(source_dir),
1025 output_dir: PathBuf::from(output_dir),
1026 ..Default::default()
1027 })
1028 }
1029
1030 pub fn from_flink(source_dir: &str, output_dir: &str) -> MigrationTool {
1032 MigrationTool::new(MigrationConfig {
1033 source_platform: SourcePlatform::Flink,
1034 source_dir: PathBuf::from(source_dir),
1035 output_dir: PathBuf::from(output_dir),
1036 ..Default::default()
1037 })
1038 }
1039
1040 pub fn from_spark(source_dir: &str, output_dir: &str) -> MigrationTool {
1042 MigrationTool::new(MigrationConfig {
1043 source_platform: SourcePlatform::SparkStreaming,
1044 source_dir: PathBuf::from(source_dir),
1045 output_dir: PathBuf::from(output_dir),
1046 ..Default::default()
1047 })
1048 }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053 use super::*;
1054
1055 fn mig_src() -> String {
1057 std::env::temp_dir()
1058 .join(format!("oxirs_mig_src_{}", std::process::id()))
1059 .display()
1060 .to_string()
1061 }
1062
1063 fn mig_out() -> String {
1065 std::env::temp_dir()
1066 .join(format!("oxirs_mig_out_{}", std::process::id()))
1067 .display()
1068 .to_string()
1069 }
1070
1071 #[tokio::test]
1072 async fn test_migration_tool_creation() {
1073 let config = MigrationConfig::default();
1074 let mut tool = MigrationTool::new(config);
1075 tool.load_default_mappings().await;
1076
1077 let mappings = tool.get_concept_mappings().await;
1078 assert!(!mappings.is_empty());
1079 }
1080
1081 #[tokio::test]
1082 async fn test_kafka_streams_mappings() {
1083 let mut tool = QuickStart::from_kafka_streams(mig_src().as_str(), mig_out().as_str());
1084 tool.load_default_mappings().await;
1085
1086 let concept_mappings = tool.get_concept_mappings().await;
1087 let has_kstream = concept_mappings.iter().any(|m| m.source_name == "KStream");
1088 assert!(has_kstream);
1089 }
1090
1091 #[tokio::test]
1092 async fn test_flink_mappings() {
1093 let mut tool = QuickStart::from_flink(mig_src().as_str(), mig_out().as_str());
1094 tool.load_default_mappings().await; let concept_mappings = tool.get_concept_mappings().await;
1097 let has_datastream = concept_mappings
1098 .iter()
1099 .any(|m| m.source_name == "DataStream");
1100 assert!(has_datastream);
1101 }
1102
1103 #[tokio::test]
1104 async fn test_custom_mapping() {
1105 let config = MigrationConfig::default();
1106 let tool = MigrationTool::new(config);
1107
1108 tool.add_concept_mapping(ConceptMapping {
1109 source_name: "CustomConcept".to_string(),
1110 target_name: "OxirsConcept".to_string(),
1111 description: "Custom mapping".to_string(),
1112 pattern: None,
1113 source_example: None,
1114 target_example: None,
1115 })
1116 .await;
1117
1118 let mappings = tool.get_concept_mappings().await;
1119 let has_custom = mappings.iter().any(|m| m.source_name == "CustomConcept");
1120 assert!(has_custom);
1121 }
1122
1123 #[tokio::test]
1124 async fn test_generate_guide() {
1125 let tool = QuickStart::from_kafka_streams(mig_src().as_str(), mig_out().as_str());
1126
1127 let guide = tool.generate_guide().await;
1128 assert!(guide.contains("Migration Guide"));
1129 assert!(guide.contains("Kafka Streams"));
1130 }
1131
1132 #[tokio::test]
1133 async fn test_analyze_empty_directory() {
1134 let config = MigrationConfig {
1135 source_dir: std::env::temp_dir()
1136 .join(format!("oxirs_mig_nonexistent_{}", std::process::id())),
1137 output_dir: std::env::temp_dir().join(format!("oxirs_mig_out_{}", std::process::id())),
1138 ..Default::default()
1139 };
1140
1141 let tool = MigrationTool::new(config);
1142 let report = tool.analyze().await.unwrap();
1143
1144 assert_eq!(report.files_processed, 0);
1145 assert_eq!(report.compatibility_score, 100.0);
1146 }
1147
1148 #[tokio::test]
1149 async fn test_api_mappings() {
1150 let mut tool = QuickStart::from_kafka_streams(mig_src().as_str(), mig_out().as_str());
1151 tool.load_default_mappings().await; let api_mappings = tool.get_api_mappings().await;
1154 let has_filter = api_mappings.iter().any(|m| m.source_api.contains("filter"));
1155 assert!(has_filter);
1156 }
1157
1158 #[tokio::test]
1159 async fn test_compatibility_score() {
1160 let config = MigrationConfig::default();
1161 let tool = MigrationTool::new(config);
1162
1163 let report = tool.analyze().await.unwrap();
1165 assert!(report.compatibility_score >= 0.0 && report.compatibility_score <= 100.0);
1166 }
1167
1168 #[tokio::test]
1169 async fn test_spark_mappings() {
1170 let mut tool = QuickStart::from_spark(mig_src().as_str(), mig_out().as_str());
1171 tool.load_default_mappings().await;
1172
1173 let mappings = tool.get_concept_mappings().await;
1174 let has_dstream = mappings.iter().any(|m| m.source_name == "DStream");
1175 assert!(has_dstream);
1176 }
1177
1178 #[tokio::test]
1179 async fn test_migration_stats() {
1180 let config = MigrationConfig::default();
1181 let tool = MigrationTool::new(config);
1182
1183 let stats = tool.get_stats().await;
1184 assert_eq!(stats.total_migrations, 0);
1185 }
1186
1187 #[tokio::test]
1188 async fn test_kafka_wrapper_has_no_todo_stubs() {
1189 let tool = MigrationTool::new(MigrationConfig {
1190 source_platform: SourcePlatform::KafkaStreams,
1191 ..Default::default()
1192 });
1193 let wrapper = tool.generate_compatibility_wrapper().await;
1194 assert!(
1195 !wrapper.contains("todo!()"),
1196 "Kafka Streams compatibility wrapper must not contain todo!() stubs; \
1197 found unimplemented body in generated template"
1198 );
1199 assert!(
1200 wrapper.contains("KStreamCompat"),
1201 "wrapper must include KStreamCompat"
1202 );
1203 assert!(
1204 wrapper.contains("KTableCompat"),
1205 "wrapper must include KTableCompat"
1206 );
1207 assert!(
1208 wrapper.contains("map_values"),
1209 "wrapper must include map_values method"
1210 );
1211 assert!(
1212 wrapper.contains("filter"),
1213 "wrapper must include filter method"
1214 );
1215 assert!(
1216 wrapper.contains("group_by"),
1217 "wrapper must include group_by method"
1218 );
1219 }
1220
1221 #[tokio::test]
1222 async fn test_flink_wrapper_has_no_todo_stubs() {
1223 let tool = MigrationTool::new(MigrationConfig {
1224 source_platform: SourcePlatform::Flink,
1225 ..Default::default()
1226 });
1227 let wrapper = tool.generate_compatibility_wrapper().await;
1228 assert!(
1229 !wrapper.contains("todo!()"),
1230 "Flink compatibility wrapper must not contain todo!() stubs; \
1231 found unimplemented body in generated template"
1232 );
1233 assert!(
1234 wrapper.contains("DataStreamCompat"),
1235 "wrapper must include DataStreamCompat"
1236 );
1237 assert!(
1238 wrapper.contains("key_by"),
1239 "wrapper must include key_by method"
1240 );
1241 assert!(
1242 wrapper.contains("KeyedStreamCompat"),
1243 "wrapper must include KeyedStreamCompat"
1244 );
1245 }
1246}