1use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use std::time::SystemTime;
21use tokio::sync::RwLock;
22
23use crate::error::StreamError;
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27pub enum SourcePlatform {
28 KafkaStreams,
30 Flink,
32 SparkStreaming,
34 Storm,
36 PulsarFunctions,
38 KinesisAnalytics,
40 Dataflow,
42 Beam,
44 Custom(String),
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct MigrationConfig {
51 pub source_platform: SourcePlatform,
53 pub source_dir: PathBuf,
55 pub output_dir: PathBuf,
57 pub generate_wrappers: bool,
59 pub preserve_comments: bool,
61 pub generate_tests: bool,
63 pub rust_edition: String,
65 pub extra_dependencies: Vec<String>,
67}
68
69impl Default for MigrationConfig {
70 fn default() -> Self {
71 Self {
72 source_platform: SourcePlatform::KafkaStreams,
73 source_dir: PathBuf::from("./source"),
74 output_dir: PathBuf::from("./migrated"),
75 generate_wrappers: true,
76 preserve_comments: true,
77 generate_tests: true,
78 rust_edition: "2021".to_string(),
79 extra_dependencies: Vec::new(),
80 }
81 }
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MigrationReport {
87 pub report_id: String,
89 pub source_platform: SourcePlatform,
91 pub timestamp: SystemTime,
93 pub files_processed: usize,
95 pub lines_converted: usize,
97 pub successful: usize,
99 pub failed: usize,
101 pub warnings: Vec<MigrationWarning>,
103 pub errors: Vec<MigrationError>,
105 pub generated_files: Vec<GeneratedFile>,
107 pub manual_review_items: Vec<ManualReviewItem>,
109 pub suggestions: Vec<MigrationSuggestion>,
111 pub compatibility_score: f64,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct MigrationWarning {
118 pub code: String,
120 pub message: String,
122 pub file: Option<PathBuf>,
124 pub line: Option<usize>,
126 pub suggestion: Option<String>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct MigrationError {
133 pub code: String,
135 pub message: String,
137 pub file: Option<PathBuf>,
139 pub line: Option<usize>,
141 pub recoverable: bool,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct GeneratedFile {
148 pub path: PathBuf,
150 pub file_type: GeneratedFileType,
152 pub lines: usize,
154 pub source_file: Option<PathBuf>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum GeneratedFileType {
161 Source,
163 Wrapper,
165 Test,
167 Config,
169 Documentation,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct ManualReviewItem {
176 pub id: String,
178 pub description: String,
180 pub file: PathBuf,
182 pub line_range: (usize, usize),
184 pub priority: ReviewPriority,
186 pub reason: String,
188 pub suggestion: String,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)]
194pub enum ReviewPriority {
195 Low,
197 Medium,
199 High,
201 Critical,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct MigrationSuggestion {
208 pub category: SuggestionCategory,
210 pub title: String,
212 pub description: String,
214 pub example: Option<String>,
216 pub references: Vec<String>,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
222pub enum SuggestionCategory {
223 Performance,
225 CodeStyle,
227 BestPractice,
229 Security,
231 RustIdiom,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct ConceptMapping {
238 pub source_name: String,
240 pub target_name: String,
242 pub description: String,
244 pub pattern: Option<String>,
246 pub source_example: Option<String>,
248 pub target_example: Option<String>,
250}
251
252#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct APIMapping {
255 pub source_api: String,
257 pub target_api: String,
259 pub param_mappings: HashMap<String, String>,
261 pub return_type_mapping: Option<String>,
263 pub notes: String,
265}
266
267pub struct MigrationTool {
269 config: MigrationConfig,
271 concept_mappings: Arc<RwLock<Vec<ConceptMapping>>>,
273 api_mappings: Arc<RwLock<Vec<APIMapping>>>,
275 stats: Arc<RwLock<MigrationStats>>,
277}
278
279#[derive(Debug, Clone, Default, Serialize, Deserialize)]
281pub struct MigrationStats {
282 pub total_migrations: u64,
284 pub successful_migrations: u64,
286 pub avg_compatibility_score: f64,
288 pub total_lines_converted: u64,
290 pub total_files_processed: u64,
292}
293
294impl MigrationTool {
295 pub fn new(config: MigrationConfig) -> Self {
298 Self {
299 config: config.clone(),
300 concept_mappings: Arc::new(RwLock::new(Vec::new())),
301 api_mappings: Arc::new(RwLock::new(Vec::new())),
302 stats: Arc::new(RwLock::new(MigrationStats::default())),
303 }
304 }
305
306 pub async fn analyze(&self) -> Result<MigrationReport, StreamError> {
308 let mut report = MigrationReport {
309 report_id: uuid::Uuid::new_v4().to_string(),
310 source_platform: self.config.source_platform.clone(),
311 timestamp: SystemTime::now(),
312 files_processed: 0,
313 lines_converted: 0,
314 successful: 0,
315 failed: 0,
316 warnings: Vec::new(),
317 errors: Vec::new(),
318 generated_files: Vec::new(),
319 manual_review_items: Vec::new(),
320 suggestions: Vec::new(),
321 compatibility_score: 0.0,
322 };
323
324 let source_files = self.scan_source_directory().await?;
326 report.files_processed = source_files.len();
327
328 for file_path in &source_files {
330 match self.analyze_file(file_path).await {
331 Ok(analysis) => {
332 report.successful += 1;
333 report.lines_converted += analysis.lines;
334 report.warnings.extend(analysis.warnings);
335 report.manual_review_items.extend(analysis.review_items);
336 }
337 Err(e) => {
338 report.failed += 1;
339 report.errors.push(MigrationError {
340 code: "ANALYSIS_ERROR".to_string(),
341 message: e.to_string(),
342 file: Some(file_path.to_path_buf()),
343 line: None,
344 recoverable: false,
345 });
346 }
347 }
348 }
349
350 report.suggestions = self.generate_suggestions(&report).await;
352
353 report.compatibility_score = self.calculate_compatibility_score(&report);
355
356 Ok(report)
357 }
358
359 pub async fn migrate(&self) -> Result<MigrationReport, StreamError> {
361 let mut report = self.analyze().await?;
362
363 if !self.config.output_dir.exists() {
365 std::fs::create_dir_all(&self.config.output_dir).map_err(|e| {
366 StreamError::Io(format!("Failed to create output directory: {}", e))
367 })?;
368 }
369
370 let cargo_toml = self.generate_cargo_toml().await;
372 let cargo_path = self.config.output_dir.join("Cargo.toml");
373 std::fs::write(&cargo_path, cargo_toml)
374 .map_err(|e| StreamError::Io(format!("Failed to write Cargo.toml: {}", e)))?;
375
376 report.generated_files.push(GeneratedFile {
377 path: cargo_path,
378 file_type: GeneratedFileType::Config,
379 lines: 30,
380 source_file: None,
381 });
382
383 let lib_rs = self.generate_lib_rs().await;
385 let lib_path = self.config.output_dir.join("src").join("lib.rs");
386 std::fs::create_dir_all(self.config.output_dir.join("src")).ok();
387 std::fs::write(&lib_path, lib_rs)
388 .map_err(|e| StreamError::Io(format!("Failed to write lib.rs: {}", e)))?;
389
390 report.generated_files.push(GeneratedFile {
391 path: lib_path,
392 file_type: GeneratedFileType::Source,
393 lines: 50,
394 source_file: None,
395 });
396
397 if self.config.generate_wrappers {
399 let wrapper = self.generate_compatibility_wrapper().await;
400 let wrapper_path = self.config.output_dir.join("src").join("compat.rs");
401 std::fs::write(&wrapper_path, wrapper)
402 .map_err(|e| StreamError::Io(format!("Failed to write compat.rs: {}", e)))?;
403
404 report.generated_files.push(GeneratedFile {
405 path: wrapper_path,
406 file_type: GeneratedFileType::Wrapper,
407 lines: 200,
408 source_file: None,
409 });
410 }
411
412 if self.config.generate_tests {
414 let tests = self.generate_tests().await;
415 let test_path = self.config.output_dir.join("tests").join("integration.rs");
416 std::fs::create_dir_all(self.config.output_dir.join("tests")).ok();
417 std::fs::write(&test_path, tests)
418 .map_err(|e| StreamError::Io(format!("Failed to write tests: {}", e)))?;
419
420 report.generated_files.push(GeneratedFile {
421 path: test_path,
422 file_type: GeneratedFileType::Test,
423 lines: 100,
424 source_file: None,
425 });
426 }
427
428 let mut stats = self.stats.write().await;
430 stats.total_migrations += 1;
431 stats.successful_migrations += 1;
432 stats.total_files_processed += report.files_processed as u64;
433 stats.total_lines_converted += report.lines_converted as u64;
434 stats.avg_compatibility_score = (stats.avg_compatibility_score
435 * (stats.total_migrations - 1) as f64
436 + report.compatibility_score)
437 / stats.total_migrations as f64;
438
439 Ok(report)
440 }
441
442 pub async fn get_concept_mappings(&self) -> Vec<ConceptMapping> {
444 self.concept_mappings.read().await.clone()
445 }
446
447 pub async fn get_api_mappings(&self) -> Vec<APIMapping> {
449 self.api_mappings.read().await.clone()
450 }
451
452 pub async fn add_concept_mapping(&self, mapping: ConceptMapping) {
454 let mut mappings = self.concept_mappings.write().await;
455 mappings.push(mapping);
456 }
457
458 pub async fn add_api_mapping(&self, mapping: APIMapping) {
460 let mut mappings = self.api_mappings.write().await;
461 mappings.push(mapping);
462 }
463
464 pub async fn get_stats(&self) -> MigrationStats {
466 self.stats.read().await.clone()
467 }
468
469 pub async fn generate_guide(&self) -> String {
471 let mut guide = String::new();
472
473 guide.push_str(&format!(
474 "# Migration Guide: {} to oxirs-stream\n\n",
475 self.platform_name()
476 ));
477
478 guide.push_str("## Overview\n\n");
479 guide.push_str(
480 "This guide helps you migrate your streaming application to oxirs-stream.\n\n",
481 );
482
483 guide.push_str("## Key Concepts\n\n");
484
485 let mappings = self.concept_mappings.read().await;
486 for mapping in mappings.iter() {
487 guide.push_str(&format!(
488 "### {} → {}\n\n{}\n\n",
489 mapping.source_name, mapping.target_name, mapping.description
490 ));
491
492 if let Some(ref source) = mapping.source_example {
493 guide.push_str("**Before:**\n```\n");
494 guide.push_str(source);
495 guide.push_str("\n```\n\n");
496 }
497
498 if let Some(ref target) = mapping.target_example {
499 guide.push_str("**After:**\n```rust\n");
500 guide.push_str(target);
501 guide.push_str("\n```\n\n");
502 }
503 }
504
505 guide.push_str("## API Reference\n\n");
506 guide.push_str("| Source API | oxirs-stream API | Notes |\n");
507 guide.push_str("|------------|------------------|-------|\n");
508
509 let api_mappings = self.api_mappings.read().await;
510 for mapping in api_mappings.iter() {
511 guide.push_str(&format!(
512 "| `{}` | `{}` | {} |\n",
513 mapping.source_api, mapping.target_api, mapping.notes
514 ));
515 }
516
517 guide.push_str("\n## Next Steps\n\n");
518 guide.push_str("1. Review the generated code\n");
519 guide.push_str("2. Address manual review items\n");
520 guide.push_str("3. Run the test suite\n");
521 guide.push_str("4. Benchmark performance\n");
522 guide.push_str("5. Deploy gradually with feature flags\n");
523
524 guide
525 }
526
527 async fn load_default_mappings(&mut self) {
530 let mut concept_mappings = self.concept_mappings.write().await;
531 let mut api_mappings = self.api_mappings.write().await;
532
533 match self.config.source_platform {
534 SourcePlatform::KafkaStreams => {
535 concept_mappings.push(ConceptMapping {
537 source_name: "KStream".to_string(),
538 target_name: "Stream".to_string(),
539 description: "Unbounded stream of records".to_string(),
540 pattern: Some("stream!".to_string()),
541 source_example: Some("KStream<String, String> stream = builder.stream(\"topic\");".to_string()),
542 target_example: Some("let stream = StreamBuilder::new()\n .source(KafkaSource::new(\"topic\"))\n .build();".to_string()),
543 });
544
545 concept_mappings.push(ConceptMapping {
546 source_name: "KTable".to_string(),
547 target_name: "StateStore".to_string(),
548 description: "Changelog stream / table".to_string(),
549 pattern: None,
550 source_example: Some(
551 "KTable<String, Long> table = builder.table(\"topic\");".to_string(),
552 ),
553 target_example: Some(
554 "let state = StateStore::new(\"table\")\n .with_changelog(\"topic\");"
555 .to_string(),
556 ),
557 });
558
559 api_mappings.push(APIMapping {
561 source_api: "stream.mapValues()".to_string(),
562 target_api: "stream.map()".to_string(),
563 param_mappings: HashMap::new(),
564 return_type_mapping: None,
565 notes: "Use map with tuple destructuring".to_string(),
566 });
567
568 api_mappings.push(APIMapping {
569 source_api: "stream.filter()".to_string(),
570 target_api: "stream.filter()".to_string(),
571 param_mappings: HashMap::new(),
572 return_type_mapping: None,
573 notes: "Direct equivalent".to_string(),
574 });
575
576 api_mappings.push(APIMapping {
577 source_api: "stream.groupByKey()".to_string(),
578 target_api: "stream.group_by_key()".to_string(),
579 param_mappings: HashMap::new(),
580 return_type_mapping: None,
581 notes: "Similar semantics".to_string(),
582 });
583
584 api_mappings.push(APIMapping {
585 source_api: "stream.windowedBy()".to_string(),
586 target_api: "stream.window()".to_string(),
587 param_mappings: {
588 let mut map = HashMap::new();
589 map.insert(
590 "TimeWindows.of()".to_string(),
591 "TumblingWindow::new()".to_string(),
592 );
593 map.insert(
594 "SlidingWindows.of()".to_string(),
595 "SlidingWindow::new()".to_string(),
596 );
597 map
598 },
599 return_type_mapping: None,
600 notes: "Window types map directly".to_string(),
601 });
602 }
603
604 SourcePlatform::Flink => {
605 concept_mappings.push(ConceptMapping {
607 source_name: "DataStream".to_string(),
608 target_name: "Stream".to_string(),
609 description: "Core streaming abstraction".to_string(),
610 pattern: None,
611 source_example: Some(
612 "DataStream<String> stream = env.addSource(source);".to_string(),
613 ),
614 target_example: Some(
615 "let stream = StreamBuilder::new().source(source).build();".to_string(),
616 ),
617 });
618
619 concept_mappings.push(ConceptMapping {
620 source_name: "KeyedStream".to_string(),
621 target_name: "GroupedStream".to_string(),
622 description: "Partitioned stream by key".to_string(),
623 pattern: None,
624 source_example: None,
625 target_example: None,
626 });
627
628 api_mappings.push(APIMapping {
629 source_api: "stream.keyBy()".to_string(),
630 target_api: "stream.key_by()".to_string(),
631 param_mappings: HashMap::new(),
632 return_type_mapping: None,
633 notes: "Use closure for key extraction".to_string(),
634 });
635
636 api_mappings.push(APIMapping {
637 source_api: "stream.process()".to_string(),
638 target_api: "stream.process()".to_string(),
639 param_mappings: HashMap::new(),
640 return_type_mapping: None,
641 notes: "Implement ProcessFunction trait".to_string(),
642 });
643 }
644
645 SourcePlatform::SparkStreaming => {
646 concept_mappings.push(ConceptMapping {
648 source_name: "DStream".to_string(),
649 target_name: "Stream".to_string(),
650 description: "Discretized stream".to_string(),
651 pattern: None,
652 source_example: Some("val stream = ssc.socketTextStream(host, port)".to_string()),
653 target_example: Some("let stream = StreamBuilder::new()\n .source(TcpSource::new(host, port))\n .build();".to_string()),
654 });
655
656 api_mappings.push(APIMapping {
657 source_api: "stream.transform()".to_string(),
658 target_api: "stream.map()".to_string(),
659 param_mappings: HashMap::new(),
660 return_type_mapping: None,
661 notes: "Use map for transformations".to_string(),
662 });
663
664 api_mappings.push(APIMapping {
665 source_api: "stream.foreachRDD()".to_string(),
666 target_api: "stream.for_each()".to_string(),
667 param_mappings: HashMap::new(),
668 return_type_mapping: None,
669 notes: "Processes each micro-batch".to_string(),
670 });
671 }
672
673 _ => {
674 concept_mappings.push(ConceptMapping {
676 source_name: "Stream".to_string(),
677 target_name: "Stream".to_string(),
678 description: "Core streaming abstraction".to_string(),
679 pattern: None,
680 source_example: None,
681 target_example: None,
682 });
683 }
684 }
685 }
686
687 async fn scan_source_directory(&self) -> Result<Vec<PathBuf>, StreamError> {
688 let mut files = Vec::new();
689
690 if !self.config.source_dir.exists() {
691 return Ok(files);
692 }
693
694 let extension = match self.config.source_platform {
695 SourcePlatform::KafkaStreams | SourcePlatform::Flink | SourcePlatform::Storm => "java",
696 SourcePlatform::SparkStreaming => "scala",
697 SourcePlatform::PulsarFunctions => "java",
698 SourcePlatform::KinesisAnalytics | SourcePlatform::Dataflow | SourcePlatform::Beam => {
699 "java"
700 }
701 SourcePlatform::Custom(_) => "java",
702 };
703
704 Self::scan_directory_recursive(&self.config.source_dir, extension, &mut files)?;
705
706 Ok(files)
707 }
708
709 fn scan_directory_recursive(
710 dir: &Path,
711 extension: &str,
712 files: &mut Vec<PathBuf>,
713 ) -> Result<(), StreamError> {
714 if dir.is_dir() {
715 for entry in std::fs::read_dir(dir)
716 .map_err(|e| StreamError::Io(format!("Failed to read directory: {}", e)))?
717 {
718 let entry =
719 entry.map_err(|e| StreamError::Io(format!("Failed to read entry: {}", e)))?;
720 let path = entry.path();
721
722 if path.is_dir() {
723 Self::scan_directory_recursive(&path, extension, files)?;
724 } else if path.extension().map(|e| e == extension).unwrap_or(false) {
725 files.push(path);
726 }
727 }
728 }
729
730 Ok(())
731 }
732
733 async fn analyze_file(&self, file_path: &Path) -> Result<FileAnalysis, StreamError> {
734 Ok(FileAnalysis {
737 lines: 100,
738 warnings: vec![MigrationWarning {
739 code: "DEPRECATED_API".to_string(),
740 message: "Some APIs may need manual review".to_string(),
741 file: Some(file_path.to_path_buf()),
742 line: None,
743 suggestion: Some("Check API mappings".to_string()),
744 }],
745 review_items: vec![],
746 })
747 }
748
749 async fn generate_suggestions(&self, report: &MigrationReport) -> Vec<MigrationSuggestion> {
750 let mut suggestions = Vec::new();
751
752 suggestions.push(MigrationSuggestion {
754 category: SuggestionCategory::Performance,
755 title: "Use async/await for I/O operations".to_string(),
756 description: "oxirs-stream is built on Tokio async runtime. Ensure all I/O operations use async methods.".to_string(),
757 example: Some("async fn process(event: Event) -> Result<Output, Error> {\n // Use .await for async operations\n}".to_string()),
758 references: vec!["https://tokio.rs/".to_string()],
759 });
760
761 suggestions.push(MigrationSuggestion {
763 category: SuggestionCategory::BestPractice,
764 title: "Use structured error handling".to_string(),
765 description: "Replace exceptions with Result types for better error propagation.".to_string(),
766 example: Some("fn process() -> Result<(), StreamError> {\n // Return errors instead of throwing\n}".to_string()),
767 references: vec![],
768 });
769
770 if report.files_processed > 0 {
772 suggestions.push(MigrationSuggestion {
773 category: SuggestionCategory::RustIdiom,
774 title: "Use iterators instead of loops".to_string(),
775 description:
776 "Rust iterators are often more performant and idiomatic than explicit loops."
777 .to_string(),
778 example: Some("let sum: i32 = values.iter().map(|x| x * 2).sum();".to_string()),
779 references: vec![],
780 });
781 }
782
783 suggestions
784 }
785
786 fn calculate_compatibility_score(&self, report: &MigrationReport) -> f64 {
787 if report.files_processed == 0 {
788 return 100.0;
789 }
790
791 let success_rate = report.successful as f64 / report.files_processed as f64;
792 let warning_penalty = (report.warnings.len() as f64 * 2.0).min(20.0);
793 let error_penalty = (report.errors.len() as f64 * 5.0).min(50.0);
794 let review_penalty = (report.manual_review_items.len() as f64).min(10.0);
795
796 (success_rate * 100.0 - warning_penalty - error_penalty - review_penalty).max(0.0)
797 }
798
799 async fn generate_cargo_toml(&self) -> String {
800 format!(
801 r#"[package]
802name = "migrated-stream"
803version = "0.1.0"
804edition = "{}"
805
806[dependencies]
807oxirs-stream = "0.1"
808tokio = {{ version = "1", features = ["full"] }}
809serde = {{ version = "1", features = ["derive"] }}
810serde_json = "1"
811{}
812"#,
813 self.config.rust_edition,
814 self.config.extra_dependencies.join("\n")
815 )
816 }
817
818 async fn generate_lib_rs(&self) -> String {
819 format!(
820 r#"//! Migrated streaming application from {}
821//! Generated by oxirs-stream migration tool
822
823{}
824pub mod compat;
825
826pub use oxirs_stream::prelude::*;
827
828// Your migrated stream processors go here
829"#,
830 self.platform_name(),
831 if self.config.generate_wrappers {
832 ""
833 } else {
834 "// Compatibility wrappers disabled\n"
835 }
836 )
837 }
838
839 async fn generate_compatibility_wrapper(&self) -> String {
840 match self.config.source_platform {
841 SourcePlatform::KafkaStreams => r#"//! Compatibility wrappers for Kafka Streams API
842
843use oxirs_stream::prelude::*;
844
845/// KStream-like wrapper for familiar API
846pub struct KStreamCompat<K, V> {
847 inner: Stream<(K, V)>,
848}
849
850impl<K, V> KStreamCompat<K, V>
851where
852 K: Clone + Send + Sync + 'static,
853 V: Clone + Send + Sync + 'static,
854{
855 pub fn new(stream: Stream<(K, V)>) -> Self {
856 Self { inner: stream }
857 }
858
859 pub fn map_values<F, V2>(self, f: F) -> KStreamCompat<K, V2>
860 where
861 F: Fn(V) -> V2 + Send + Sync + Clone + 'static,
862 V2: Clone + Send + Sync + 'static,
863 {
864 // Implementation would go here
865 todo!()
866 }
867
868 pub fn filter<F>(self, predicate: F) -> Self
869 where
870 F: Fn(&K, &V) -> bool + Send + Sync + Clone + 'static,
871 {
872 // Implementation would go here
873 todo!()
874 }
875}
876
877/// KTable-like wrapper
878pub struct KTableCompat<K, V> {
879 store: StateStore<K, V>,
880}
881"#
882 .to_string(),
883 SourcePlatform::Flink => r#"//! Compatibility wrappers for Flink API
884
885use oxirs_stream::prelude::*;
886
887/// DataStream-like wrapper
888pub struct DataStreamCompat<T> {
889 inner: Stream<T>,
890}
891
892impl<T> DataStreamCompat<T>
893where
894 T: Clone + Send + Sync + 'static,
895{
896 pub fn new(stream: Stream<T>) -> Self {
897 Self { inner: stream }
898 }
899
900 pub fn key_by<K, F>(self, key_selector: F) -> KeyedStreamCompat<K, T>
901 where
902 K: Clone + Send + Sync + 'static,
903 F: Fn(&T) -> K + Send + Sync + Clone + 'static,
904 {
905 // Implementation would go here
906 todo!()
907 }
908}
909
910/// KeyedStream-like wrapper
911pub struct KeyedStreamCompat<K, T> {
912 inner: GroupedStream<K, T>,
913}
914"#
915 .to_string(),
916 _ => r#"//! Generic compatibility wrappers
917
918use oxirs_stream::prelude::*;
919
920// Add platform-specific wrappers as needed
921"#
922 .to_string(),
923 }
924 }
925
926 async fn generate_tests(&self) -> String {
927 r#"//! Integration tests for migrated application
928
929use oxirs_stream::prelude::*;
930
931#[tokio::test]
932async fn test_basic_stream() {
933 // Add your tests here
934 assert!(true);
935}
936
937#[tokio::test]
938async fn test_window_operations() {
939 // Test window operations
940 assert!(true);
941}
942
943#[tokio::test]
944async fn test_aggregations() {
945 // Test aggregations
946 assert!(true);
947}
948"#
949 .to_string()
950 }
951
952 fn platform_name(&self) -> String {
953 match &self.config.source_platform {
954 SourcePlatform::KafkaStreams => "Kafka Streams".to_string(),
955 SourcePlatform::Flink => "Apache Flink".to_string(),
956 SourcePlatform::SparkStreaming => "Spark Streaming".to_string(),
957 SourcePlatform::Storm => "Apache Storm".to_string(),
958 SourcePlatform::PulsarFunctions => "Pulsar Functions".to_string(),
959 SourcePlatform::KinesisAnalytics => "Kinesis Analytics".to_string(),
960 SourcePlatform::Dataflow => "Google Dataflow".to_string(),
961 SourcePlatform::Beam => "Apache Beam".to_string(),
962 SourcePlatform::Custom(name) => name.clone(),
963 }
964 }
965}
966
967struct FileAnalysis {
969 lines: usize,
970 warnings: Vec<MigrationWarning>,
971 review_items: Vec<ManualReviewItem>,
972}
973
974pub struct QuickStart;
976
977impl QuickStart {
978 pub fn from_kafka_streams(source_dir: &str, output_dir: &str) -> MigrationTool {
980 MigrationTool::new(MigrationConfig {
981 source_platform: SourcePlatform::KafkaStreams,
982 source_dir: PathBuf::from(source_dir),
983 output_dir: PathBuf::from(output_dir),
984 ..Default::default()
985 })
986 }
987
988 pub fn from_flink(source_dir: &str, output_dir: &str) -> MigrationTool {
990 MigrationTool::new(MigrationConfig {
991 source_platform: SourcePlatform::Flink,
992 source_dir: PathBuf::from(source_dir),
993 output_dir: PathBuf::from(output_dir),
994 ..Default::default()
995 })
996 }
997
998 pub fn from_spark(source_dir: &str, output_dir: &str) -> MigrationTool {
1000 MigrationTool::new(MigrationConfig {
1001 source_platform: SourcePlatform::SparkStreaming,
1002 source_dir: PathBuf::from(source_dir),
1003 output_dir: PathBuf::from(output_dir),
1004 ..Default::default()
1005 })
1006 }
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011 use super::*;
1012
1013 #[tokio::test]
1014 async fn test_migration_tool_creation() {
1015 let config = MigrationConfig::default();
1016 let mut tool = MigrationTool::new(config);
1017 tool.load_default_mappings().await;
1018
1019 let mappings = tool.get_concept_mappings().await;
1020 assert!(!mappings.is_empty());
1021 }
1022
1023 #[tokio::test]
1024 async fn test_kafka_streams_mappings() {
1025 let mut tool = QuickStart::from_kafka_streams("/tmp/source", "/tmp/output");
1026 tool.load_default_mappings().await;
1027
1028 let concept_mappings = tool.get_concept_mappings().await;
1029 let has_kstream = concept_mappings.iter().any(|m| m.source_name == "KStream");
1030 assert!(has_kstream);
1031 }
1032
1033 #[tokio::test]
1034 async fn test_flink_mappings() {
1035 let mut tool = QuickStart::from_flink("/tmp/source", "/tmp/output");
1036 tool.load_default_mappings().await; let concept_mappings = tool.get_concept_mappings().await;
1039 let has_datastream = concept_mappings
1040 .iter()
1041 .any(|m| m.source_name == "DataStream");
1042 assert!(has_datastream);
1043 }
1044
1045 #[tokio::test]
1046 async fn test_custom_mapping() {
1047 let config = MigrationConfig::default();
1048 let tool = MigrationTool::new(config);
1049
1050 tool.add_concept_mapping(ConceptMapping {
1051 source_name: "CustomConcept".to_string(),
1052 target_name: "OxirsConcept".to_string(),
1053 description: "Custom mapping".to_string(),
1054 pattern: None,
1055 source_example: None,
1056 target_example: None,
1057 })
1058 .await;
1059
1060 let mappings = tool.get_concept_mappings().await;
1061 let has_custom = mappings.iter().any(|m| m.source_name == "CustomConcept");
1062 assert!(has_custom);
1063 }
1064
1065 #[tokio::test]
1066 async fn test_generate_guide() {
1067 let tool = QuickStart::from_kafka_streams("/tmp/source", "/tmp/output");
1068
1069 let guide = tool.generate_guide().await;
1070 assert!(guide.contains("Migration Guide"));
1071 assert!(guide.contains("Kafka Streams"));
1072 }
1073
1074 #[tokio::test]
1075 async fn test_analyze_empty_directory() {
1076 let config = MigrationConfig {
1077 source_dir: PathBuf::from("/tmp/nonexistent"),
1078 output_dir: PathBuf::from("/tmp/output"),
1079 ..Default::default()
1080 };
1081
1082 let tool = MigrationTool::new(config);
1083 let report = tool.analyze().await.unwrap();
1084
1085 assert_eq!(report.files_processed, 0);
1086 assert_eq!(report.compatibility_score, 100.0);
1087 }
1088
1089 #[tokio::test]
1090 async fn test_api_mappings() {
1091 let mut tool = QuickStart::from_kafka_streams("/tmp/source", "/tmp/output");
1092 tool.load_default_mappings().await; let api_mappings = tool.get_api_mappings().await;
1095 let has_filter = api_mappings.iter().any(|m| m.source_api.contains("filter"));
1096 assert!(has_filter);
1097 }
1098
1099 #[tokio::test]
1100 async fn test_compatibility_score() {
1101 let config = MigrationConfig::default();
1102 let tool = MigrationTool::new(config);
1103
1104 let report = tool.analyze().await.unwrap();
1106 assert!(report.compatibility_score >= 0.0 && report.compatibility_score <= 100.0);
1107 }
1108
1109 #[tokio::test]
1110 async fn test_spark_mappings() {
1111 let mut tool = QuickStart::from_spark("/tmp/source", "/tmp/output");
1112 tool.load_default_mappings().await;
1113
1114 let mappings = tool.get_concept_mappings().await;
1115 let has_dstream = mappings.iter().any(|m| m.source_name == "DStream");
1116 assert!(has_dstream);
1117 }
1118
1119 #[tokio::test]
1120 async fn test_migration_stats() {
1121 let config = MigrationConfig::default();
1122 let tool = MigrationTool::new(config);
1123
1124 let stats = tool.get_stats().await;
1125 assert_eq!(stats.total_migrations, 0);
1126 }
1127}