Skip to main content

oxirs_stream/
migration_tools.rs

1//! Migration Tools from Other Streaming Platforms
2//!
3//! This module provides tools to migrate streaming applications from other
4//! platforms to oxirs-stream with minimal code changes.
5//!
6//! # Supported Platforms
7//!
8//! - Apache Kafka Streams
9//! - Apache Flink
10//! - Apache Spark Streaming
11//! - Apache Storm
12//! - Apache Pulsar Functions
13//! - AWS Kinesis
14//! - Google Dataflow/Beam
15
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20use std::time::SystemTime;
21use tokio::sync::RwLock;
22
23use crate::error::StreamError;
24
25/// Source platform type
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
27pub enum SourcePlatform {
28    /// Apache Kafka Streams
29    KafkaStreams,
30    /// Apache Flink
31    Flink,
32    /// Apache Spark Streaming
33    SparkStreaming,
34    /// Apache Storm
35    Storm,
36    /// Apache Pulsar Functions
37    PulsarFunctions,
38    /// AWS Kinesis Data Analytics
39    KinesisAnalytics,
40    /// Google Cloud Dataflow
41    Dataflow,
42    /// Apache Beam (generic)
43    Beam,
44    /// Custom platform
45    Custom(String),
46}
47
48/// Migration configuration
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct MigrationConfig {
51    /// Source platform
52    pub source_platform: SourcePlatform,
53    /// Source code directory
54    pub source_dir: PathBuf,
55    /// Output directory
56    pub output_dir: PathBuf,
57    /// Generate compatibility wrappers
58    pub generate_wrappers: bool,
59    /// Preserve original comments
60    pub preserve_comments: bool,
61    /// Generate tests
62    pub generate_tests: bool,
63    /// Target Rust edition
64    pub rust_edition: String,
65    /// Additional dependencies
66    pub extra_dependencies: Vec<String>,
67}
68
69impl Default for MigrationConfig {
70    fn default() -> Self {
71        Self {
72            source_platform: SourcePlatform::KafkaStreams,
73            source_dir: PathBuf::from("./source"),
74            output_dir: PathBuf::from("./migrated"),
75            generate_wrappers: true,
76            preserve_comments: true,
77            generate_tests: true,
78            rust_edition: "2021".to_string(),
79            extra_dependencies: Vec::new(),
80        }
81    }
82}
83
84/// Migration report
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct MigrationReport {
87    /// Report ID
88    pub report_id: String,
89    /// Source platform
90    pub source_platform: SourcePlatform,
91    /// Migration timestamp
92    pub timestamp: SystemTime,
93    /// Files processed
94    pub files_processed: usize,
95    /// Lines of code converted
96    pub lines_converted: usize,
97    /// Successful conversions
98    pub successful: usize,
99    /// Failed conversions
100    pub failed: usize,
101    /// Warnings generated
102    pub warnings: Vec<MigrationWarning>,
103    /// Errors encountered
104    pub errors: Vec<MigrationError>,
105    /// Generated files
106    pub generated_files: Vec<GeneratedFile>,
107    /// Manual review required
108    pub manual_review_items: Vec<ManualReviewItem>,
109    /// Migration suggestions
110    pub suggestions: Vec<MigrationSuggestion>,
111    /// Compatibility score (0-100)
112    pub compatibility_score: f64,
113}
114
115/// Migration warning
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct MigrationWarning {
118    /// Warning code
119    pub code: String,
120    /// Warning message
121    pub message: String,
122    /// File location
123    pub file: Option<PathBuf>,
124    /// Line number
125    pub line: Option<usize>,
126    /// Suggested fix
127    pub suggestion: Option<String>,
128}
129
130/// Migration error
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct MigrationError {
133    /// Error code
134    pub code: String,
135    /// Error message
136    pub message: String,
137    /// File location
138    pub file: Option<PathBuf>,
139    /// Line number
140    pub line: Option<usize>,
141    /// Is recoverable
142    pub recoverable: bool,
143}
144
145/// Generated file information
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct GeneratedFile {
148    /// File path
149    pub path: PathBuf,
150    /// File type
151    pub file_type: GeneratedFileType,
152    /// Lines of code
153    pub lines: usize,
154    /// Original source file
155    pub source_file: Option<PathBuf>,
156}
157
158/// Generated file type
159#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
160pub enum GeneratedFileType {
161    /// Main source code
162    Source,
163    /// Compatibility wrapper
164    Wrapper,
165    /// Test file
166    Test,
167    /// Configuration
168    Config,
169    /// Documentation
170    Documentation,
171}
172
173/// Item requiring manual review
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct ManualReviewItem {
176    /// Item ID
177    pub id: String,
178    /// Description
179    pub description: String,
180    /// File location
181    pub file: PathBuf,
182    /// Line range
183    pub line_range: (usize, usize),
184    /// Priority
185    pub priority: ReviewPriority,
186    /// Reason for manual review
187    pub reason: String,
188    /// Suggested approach
189    pub suggestion: String,
190}
191
192/// Review priority
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)]
194pub enum ReviewPriority {
195    /// Low priority
196    Low,
197    /// Medium priority
198    Medium,
199    /// High priority
200    High,
201    /// Critical - must be reviewed
202    Critical,
203}
204
205/// Migration suggestion
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct MigrationSuggestion {
208    /// Suggestion category
209    pub category: SuggestionCategory,
210    /// Suggestion title
211    pub title: String,
212    /// Detailed description
213    pub description: String,
214    /// Code example
215    pub example: Option<String>,
216    /// References
217    pub references: Vec<String>,
218}
219
220/// Suggestion category
221#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
222pub enum SuggestionCategory {
223    /// Performance improvement
224    Performance,
225    /// Code style
226    CodeStyle,
227    /// Best practice
228    BestPractice,
229    /// Security
230    Security,
231    /// Idiomatic Rust
232    RustIdiom,
233}
234
235/// Concept mapping between platforms
236#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct ConceptMapping {
238    /// Source concept name
239    pub source_name: String,
240    /// Target oxirs-stream equivalent
241    pub target_name: String,
242    /// Description of the mapping
243    pub description: String,
244    /// Code transformation pattern
245    pub pattern: Option<String>,
246    /// Example source code
247    pub source_example: Option<String>,
248    /// Example target code
249    pub target_example: Option<String>,
250}
251
252/// API mapping for code transformation
253#[derive(Debug, Clone, Serialize, Deserialize)]
254pub struct APIMapping {
255    /// Source API call
256    pub source_api: String,
257    /// Target API call
258    pub target_api: String,
259    /// Parameter mappings
260    pub param_mappings: HashMap<String, String>,
261    /// Return type mapping
262    pub return_type_mapping: Option<String>,
263    /// Notes
264    pub notes: String,
265}
266
267/// Migration tool for converting streaming applications
268pub struct MigrationTool {
269    /// Configuration
270    config: MigrationConfig,
271    /// Concept mappings
272    concept_mappings: Arc<RwLock<Vec<ConceptMapping>>>,
273    /// API mappings
274    api_mappings: Arc<RwLock<Vec<APIMapping>>>,
275    /// Migration statistics
276    stats: Arc<RwLock<MigrationStats>>,
277}
278
279/// Migration statistics
280#[derive(Debug, Clone, Default, Serialize, Deserialize)]
281pub struct MigrationStats {
282    /// Total migrations performed
283    pub total_migrations: u64,
284    /// Successful migrations
285    pub successful_migrations: u64,
286    /// Average compatibility score
287    pub avg_compatibility_score: f64,
288    /// Total lines converted
289    pub total_lines_converted: u64,
290    /// Total files processed
291    pub total_files_processed: u64,
292}
293
294impl MigrationTool {
295    /// Create a new migration tool
296    /// Note: Call load_default_mappings().await after creation to initialize default mappings
297    pub fn new(config: MigrationConfig) -> Self {
298        Self {
299            config: config.clone(),
300            concept_mappings: Arc::new(RwLock::new(Vec::new())),
301            api_mappings: Arc::new(RwLock::new(Vec::new())),
302            stats: Arc::new(RwLock::new(MigrationStats::default())),
303        }
304    }
305
306    /// Perform migration analysis without generating code
307    pub async fn analyze(&self) -> Result<MigrationReport, StreamError> {
308        let mut report = MigrationReport {
309            report_id: uuid::Uuid::new_v4().to_string(),
310            source_platform: self.config.source_platform.clone(),
311            timestamp: SystemTime::now(),
312            files_processed: 0,
313            lines_converted: 0,
314            successful: 0,
315            failed: 0,
316            warnings: Vec::new(),
317            errors: Vec::new(),
318            generated_files: Vec::new(),
319            manual_review_items: Vec::new(),
320            suggestions: Vec::new(),
321            compatibility_score: 0.0,
322        };
323
324        // Scan source directory
325        let source_files = self.scan_source_directory().await?;
326        report.files_processed = source_files.len();
327
328        // Analyze each file
329        for file_path in &source_files {
330            match self.analyze_file(file_path).await {
331                Ok(analysis) => {
332                    report.successful += 1;
333                    report.lines_converted += analysis.lines;
334                    report.warnings.extend(analysis.warnings);
335                    report.manual_review_items.extend(analysis.review_items);
336                }
337                Err(e) => {
338                    report.failed += 1;
339                    report.errors.push(MigrationError {
340                        code: "ANALYSIS_ERROR".to_string(),
341                        message: e.to_string(),
342                        file: Some(file_path.to_path_buf()),
343                        line: None,
344                        recoverable: false,
345                    });
346                }
347            }
348        }
349
350        // Generate suggestions
351        report.suggestions = self.generate_suggestions(&report).await;
352
353        // Calculate compatibility score
354        report.compatibility_score = self.calculate_compatibility_score(&report);
355
356        Ok(report)
357    }
358
359    /// Perform full migration
360    pub async fn migrate(&self) -> Result<MigrationReport, StreamError> {
361        let mut report = self.analyze().await?;
362
363        // Create output directory
364        if !self.config.output_dir.exists() {
365            std::fs::create_dir_all(&self.config.output_dir).map_err(|e| {
366                StreamError::Io(format!("Failed to create output directory: {}", e))
367            })?;
368        }
369
370        // Generate Cargo.toml
371        let cargo_toml = self.generate_cargo_toml().await;
372        let cargo_path = self.config.output_dir.join("Cargo.toml");
373        std::fs::write(&cargo_path, cargo_toml)
374            .map_err(|e| StreamError::Io(format!("Failed to write Cargo.toml: {}", e)))?;
375
376        report.generated_files.push(GeneratedFile {
377            path: cargo_path,
378            file_type: GeneratedFileType::Config,
379            lines: 30,
380            source_file: None,
381        });
382
383        // Generate main library file
384        let lib_rs = self.generate_lib_rs().await;
385        let lib_path = self.config.output_dir.join("src").join("lib.rs");
386        std::fs::create_dir_all(self.config.output_dir.join("src")).ok();
387        std::fs::write(&lib_path, lib_rs)
388            .map_err(|e| StreamError::Io(format!("Failed to write lib.rs: {}", e)))?;
389
390        report.generated_files.push(GeneratedFile {
391            path: lib_path,
392            file_type: GeneratedFileType::Source,
393            lines: 50,
394            source_file: None,
395        });
396
397        // Generate compatibility wrappers if requested
398        if self.config.generate_wrappers {
399            let wrapper = self.generate_compatibility_wrapper().await;
400            let wrapper_path = self.config.output_dir.join("src").join("compat.rs");
401            std::fs::write(&wrapper_path, wrapper)
402                .map_err(|e| StreamError::Io(format!("Failed to write compat.rs: {}", e)))?;
403
404            report.generated_files.push(GeneratedFile {
405                path: wrapper_path,
406                file_type: GeneratedFileType::Wrapper,
407                lines: 200,
408                source_file: None,
409            });
410        }
411
412        // Generate tests if requested
413        if self.config.generate_tests {
414            let tests = self.generate_tests().await;
415            let test_path = self.config.output_dir.join("tests").join("integration.rs");
416            std::fs::create_dir_all(self.config.output_dir.join("tests")).ok();
417            std::fs::write(&test_path, tests)
418                .map_err(|e| StreamError::Io(format!("Failed to write tests: {}", e)))?;
419
420            report.generated_files.push(GeneratedFile {
421                path: test_path,
422                file_type: GeneratedFileType::Test,
423                lines: 100,
424                source_file: None,
425            });
426        }
427
428        // Update statistics
429        let mut stats = self.stats.write().await;
430        stats.total_migrations += 1;
431        stats.successful_migrations += 1;
432        stats.total_files_processed += report.files_processed as u64;
433        stats.total_lines_converted += report.lines_converted as u64;
434        stats.avg_compatibility_score = (stats.avg_compatibility_score
435            * (stats.total_migrations - 1) as f64
436            + report.compatibility_score)
437            / stats.total_migrations as f64;
438
439        Ok(report)
440    }
441
442    /// Get concept mappings for a platform
443    pub async fn get_concept_mappings(&self) -> Vec<ConceptMapping> {
444        self.concept_mappings.read().await.clone()
445    }
446
447    /// Get API mappings
448    pub async fn get_api_mappings(&self) -> Vec<APIMapping> {
449        self.api_mappings.read().await.clone()
450    }
451
452    /// Add custom concept mapping
453    pub async fn add_concept_mapping(&self, mapping: ConceptMapping) {
454        let mut mappings = self.concept_mappings.write().await;
455        mappings.push(mapping);
456    }
457
458    /// Add custom API mapping
459    pub async fn add_api_mapping(&self, mapping: APIMapping) {
460        let mut mappings = self.api_mappings.write().await;
461        mappings.push(mapping);
462    }
463
464    /// Get migration statistics
465    pub async fn get_stats(&self) -> MigrationStats {
466        self.stats.read().await.clone()
467    }
468
469    /// Generate migration guide for the source platform
470    pub async fn generate_guide(&self) -> String {
471        let mut guide = String::new();
472
473        guide.push_str(&format!(
474            "# Migration Guide: {} to oxirs-stream\n\n",
475            self.platform_name()
476        ));
477
478        guide.push_str("## Overview\n\n");
479        guide.push_str(
480            "This guide helps you migrate your streaming application to oxirs-stream.\n\n",
481        );
482
483        guide.push_str("## Key Concepts\n\n");
484
485        let mappings = self.concept_mappings.read().await;
486        for mapping in mappings.iter() {
487            guide.push_str(&format!(
488                "### {} → {}\n\n{}\n\n",
489                mapping.source_name, mapping.target_name, mapping.description
490            ));
491
492            if let Some(ref source) = mapping.source_example {
493                guide.push_str("**Before:**\n```\n");
494                guide.push_str(source);
495                guide.push_str("\n```\n\n");
496            }
497
498            if let Some(ref target) = mapping.target_example {
499                guide.push_str("**After:**\n```rust\n");
500                guide.push_str(target);
501                guide.push_str("\n```\n\n");
502            }
503        }
504
505        guide.push_str("## API Reference\n\n");
506        guide.push_str("| Source API | oxirs-stream API | Notes |\n");
507        guide.push_str("|------------|------------------|-------|\n");
508
509        let api_mappings = self.api_mappings.read().await;
510        for mapping in api_mappings.iter() {
511            guide.push_str(&format!(
512                "| `{}` | `{}` | {} |\n",
513                mapping.source_api, mapping.target_api, mapping.notes
514            ));
515        }
516
517        guide.push_str("\n## Next Steps\n\n");
518        guide.push_str("1. Review the generated code\n");
519        guide.push_str("2. Address manual review items\n");
520        guide.push_str("3. Run the test suite\n");
521        guide.push_str("4. Benchmark performance\n");
522        guide.push_str("5. Deploy gradually with feature flags\n");
523
524        guide
525    }
526
527    // Private helper methods
528
529    async fn load_default_mappings(&mut self) {
530        let mut concept_mappings = self.concept_mappings.write().await;
531        let mut api_mappings = self.api_mappings.write().await;
532
533        match self.config.source_platform {
534            SourcePlatform::KafkaStreams => {
535                // Kafka Streams concept mappings
536                concept_mappings.push(ConceptMapping {
537                    source_name: "KStream".to_string(),
538                    target_name: "Stream".to_string(),
539                    description: "Unbounded stream of records".to_string(),
540                    pattern: Some("stream!".to_string()),
541                    source_example: Some("KStream<String, String> stream = builder.stream(\"topic\");".to_string()),
542                    target_example: Some("let stream = StreamBuilder::new()\n    .source(KafkaSource::new(\"topic\"))\n    .build();".to_string()),
543                });
544
545                concept_mappings.push(ConceptMapping {
546                    source_name: "KTable".to_string(),
547                    target_name: "StateStore".to_string(),
548                    description: "Changelog stream / table".to_string(),
549                    pattern: None,
550                    source_example: Some(
551                        "KTable<String, Long> table = builder.table(\"topic\");".to_string(),
552                    ),
553                    target_example: Some(
554                        "let state = StateStore::new(\"table\")\n    .with_changelog(\"topic\");"
555                            .to_string(),
556                    ),
557                });
558
559                // API mappings
560                api_mappings.push(APIMapping {
561                    source_api: "stream.mapValues()".to_string(),
562                    target_api: "stream.map()".to_string(),
563                    param_mappings: HashMap::new(),
564                    return_type_mapping: None,
565                    notes: "Use map with tuple destructuring".to_string(),
566                });
567
568                api_mappings.push(APIMapping {
569                    source_api: "stream.filter()".to_string(),
570                    target_api: "stream.filter()".to_string(),
571                    param_mappings: HashMap::new(),
572                    return_type_mapping: None,
573                    notes: "Direct equivalent".to_string(),
574                });
575
576                api_mappings.push(APIMapping {
577                    source_api: "stream.groupByKey()".to_string(),
578                    target_api: "stream.group_by_key()".to_string(),
579                    param_mappings: HashMap::new(),
580                    return_type_mapping: None,
581                    notes: "Similar semantics".to_string(),
582                });
583
584                api_mappings.push(APIMapping {
585                    source_api: "stream.windowedBy()".to_string(),
586                    target_api: "stream.window()".to_string(),
587                    param_mappings: {
588                        let mut map = HashMap::new();
589                        map.insert(
590                            "TimeWindows.of()".to_string(),
591                            "TumblingWindow::new()".to_string(),
592                        );
593                        map.insert(
594                            "SlidingWindows.of()".to_string(),
595                            "SlidingWindow::new()".to_string(),
596                        );
597                        map
598                    },
599                    return_type_mapping: None,
600                    notes: "Window types map directly".to_string(),
601                });
602            }
603
604            SourcePlatform::Flink => {
605                // Flink concept mappings
606                concept_mappings.push(ConceptMapping {
607                    source_name: "DataStream".to_string(),
608                    target_name: "Stream".to_string(),
609                    description: "Core streaming abstraction".to_string(),
610                    pattern: None,
611                    source_example: Some(
612                        "DataStream<String> stream = env.addSource(source);".to_string(),
613                    ),
614                    target_example: Some(
615                        "let stream = StreamBuilder::new().source(source).build();".to_string(),
616                    ),
617                });
618
619                concept_mappings.push(ConceptMapping {
620                    source_name: "KeyedStream".to_string(),
621                    target_name: "GroupedStream".to_string(),
622                    description: "Partitioned stream by key".to_string(),
623                    pattern: None,
624                    source_example: None,
625                    target_example: None,
626                });
627
628                api_mappings.push(APIMapping {
629                    source_api: "stream.keyBy()".to_string(),
630                    target_api: "stream.key_by()".to_string(),
631                    param_mappings: HashMap::new(),
632                    return_type_mapping: None,
633                    notes: "Use closure for key extraction".to_string(),
634                });
635
636                api_mappings.push(APIMapping {
637                    source_api: "stream.process()".to_string(),
638                    target_api: "stream.process()".to_string(),
639                    param_mappings: HashMap::new(),
640                    return_type_mapping: None,
641                    notes: "Implement ProcessFunction trait".to_string(),
642                });
643            }
644
645            SourcePlatform::SparkStreaming => {
646                // Spark Streaming concept mappings
647                concept_mappings.push(ConceptMapping {
648                    source_name: "DStream".to_string(),
649                    target_name: "Stream".to_string(),
650                    description: "Discretized stream".to_string(),
651                    pattern: None,
652                    source_example: Some("val stream = ssc.socketTextStream(host, port)".to_string()),
653                    target_example: Some("let stream = StreamBuilder::new()\n    .source(TcpSource::new(host, port))\n    .build();".to_string()),
654                });
655
656                api_mappings.push(APIMapping {
657                    source_api: "stream.transform()".to_string(),
658                    target_api: "stream.map()".to_string(),
659                    param_mappings: HashMap::new(),
660                    return_type_mapping: None,
661                    notes: "Use map for transformations".to_string(),
662                });
663
664                api_mappings.push(APIMapping {
665                    source_api: "stream.foreachRDD()".to_string(),
666                    target_api: "stream.for_each()".to_string(),
667                    param_mappings: HashMap::new(),
668                    return_type_mapping: None,
669                    notes: "Processes each micro-batch".to_string(),
670                });
671            }
672
673            _ => {
674                // Generic mappings for other platforms
675                concept_mappings.push(ConceptMapping {
676                    source_name: "Stream".to_string(),
677                    target_name: "Stream".to_string(),
678                    description: "Core streaming abstraction".to_string(),
679                    pattern: None,
680                    source_example: None,
681                    target_example: None,
682                });
683            }
684        }
685    }
686
687    async fn scan_source_directory(&self) -> Result<Vec<PathBuf>, StreamError> {
688        let mut files = Vec::new();
689
690        if !self.config.source_dir.exists() {
691            return Ok(files);
692        }
693
694        let extension = match self.config.source_platform {
695            SourcePlatform::KafkaStreams | SourcePlatform::Flink | SourcePlatform::Storm => "java",
696            SourcePlatform::SparkStreaming => "scala",
697            SourcePlatform::PulsarFunctions => "java",
698            SourcePlatform::KinesisAnalytics | SourcePlatform::Dataflow | SourcePlatform::Beam => {
699                "java"
700            }
701            SourcePlatform::Custom(_) => "java",
702        };
703
704        Self::scan_directory_recursive(&self.config.source_dir, extension, &mut files)?;
705
706        Ok(files)
707    }
708
709    fn scan_directory_recursive(
710        dir: &Path,
711        extension: &str,
712        files: &mut Vec<PathBuf>,
713    ) -> Result<(), StreamError> {
714        if dir.is_dir() {
715            for entry in std::fs::read_dir(dir)
716                .map_err(|e| StreamError::Io(format!("Failed to read directory: {}", e)))?
717            {
718                let entry =
719                    entry.map_err(|e| StreamError::Io(format!("Failed to read entry: {}", e)))?;
720                let path = entry.path();
721
722                if path.is_dir() {
723                    Self::scan_directory_recursive(&path, extension, files)?;
724                } else if path.extension().map(|e| e == extension).unwrap_or(false) {
725                    files.push(path);
726                }
727            }
728        }
729
730        Ok(())
731    }
732
733    async fn analyze_file(&self, file_path: &Path) -> Result<FileAnalysis, StreamError> {
734        // In a real implementation, we'd parse the source file
735        // For now, return a mock analysis
736        Ok(FileAnalysis {
737            lines: 100,
738            warnings: vec![MigrationWarning {
739                code: "DEPRECATED_API".to_string(),
740                message: "Some APIs may need manual review".to_string(),
741                file: Some(file_path.to_path_buf()),
742                line: None,
743                suggestion: Some("Check API mappings".to_string()),
744            }],
745            review_items: vec![],
746        })
747    }
748
749    async fn generate_suggestions(&self, report: &MigrationReport) -> Vec<MigrationSuggestion> {
750        let mut suggestions = Vec::new();
751
752        // Performance suggestions
753        suggestions.push(MigrationSuggestion {
754            category: SuggestionCategory::Performance,
755            title: "Use async/await for I/O operations".to_string(),
756            description: "oxirs-stream is built on Tokio async runtime. Ensure all I/O operations use async methods.".to_string(),
757            example: Some("async fn process(event: Event) -> Result<Output, Error> {\n    // Use .await for async operations\n}".to_string()),
758            references: vec!["https://tokio.rs/".to_string()],
759        });
760
761        // Best practice suggestions
762        suggestions.push(MigrationSuggestion {
763            category: SuggestionCategory::BestPractice,
764            title: "Use structured error handling".to_string(),
765            description: "Replace exceptions with Result types for better error propagation.".to_string(),
766            example: Some("fn process() -> Result<(), StreamError> {\n    // Return errors instead of throwing\n}".to_string()),
767            references: vec![],
768        });
769
770        // Rust idiom suggestions
771        if report.files_processed > 0 {
772            suggestions.push(MigrationSuggestion {
773                category: SuggestionCategory::RustIdiom,
774                title: "Use iterators instead of loops".to_string(),
775                description:
776                    "Rust iterators are often more performant and idiomatic than explicit loops."
777                        .to_string(),
778                example: Some("let sum: i32 = values.iter().map(|x| x * 2).sum();".to_string()),
779                references: vec![],
780            });
781        }
782
783        suggestions
784    }
785
786    fn calculate_compatibility_score(&self, report: &MigrationReport) -> f64 {
787        if report.files_processed == 0 {
788            return 100.0;
789        }
790
791        let success_rate = report.successful as f64 / report.files_processed as f64;
792        let warning_penalty = (report.warnings.len() as f64 * 2.0).min(20.0);
793        let error_penalty = (report.errors.len() as f64 * 5.0).min(50.0);
794        let review_penalty = (report.manual_review_items.len() as f64).min(10.0);
795
796        (success_rate * 100.0 - warning_penalty - error_penalty - review_penalty).max(0.0)
797    }
798
799    async fn generate_cargo_toml(&self) -> String {
800        format!(
801            r#"[package]
802name = "migrated-stream"
803version = "0.1.0"
804edition = "{}"
805
806[dependencies]
807oxirs-stream = "0.1"
808tokio = {{ version = "1", features = ["full"] }}
809serde = {{ version = "1", features = ["derive"] }}
810serde_json = "1"
811{}
812"#,
813            self.config.rust_edition,
814            self.config.extra_dependencies.join("\n")
815        )
816    }
817
818    async fn generate_lib_rs(&self) -> String {
819        format!(
820            r#"//! Migrated streaming application from {}
821//! Generated by oxirs-stream migration tool
822
823{}
824pub mod compat;
825
826pub use oxirs_stream::prelude::*;
827
828// Your migrated stream processors go here
829"#,
830            self.platform_name(),
831            if self.config.generate_wrappers {
832                ""
833            } else {
834                "// Compatibility wrappers disabled\n"
835            }
836        )
837    }
838
839    async fn generate_compatibility_wrapper(&self) -> String {
840        match self.config.source_platform {
841            SourcePlatform::KafkaStreams => r#"//! Compatibility wrappers for Kafka Streams API
842
843use oxirs_stream::prelude::*;
844
845/// KStream-like wrapper for familiar API
846pub struct KStreamCompat<K, V> {
847    inner: Stream<(K, V)>,
848}
849
850impl<K, V> KStreamCompat<K, V>
851where
852    K: Clone + Send + Sync + 'static,
853    V: Clone + Send + Sync + 'static,
854{
855    pub fn new(stream: Stream<(K, V)>) -> Self {
856        Self { inner: stream }
857    }
858
859    pub fn map_values<F, V2>(self, f: F) -> KStreamCompat<K, V2>
860    where
861        F: Fn(V) -> V2 + Send + Sync + Clone + 'static,
862        V2: Clone + Send + Sync + 'static,
863    {
864        KStreamCompat {
865            inner: self.inner.map(move |(k, v)| (k, f(v))),
866        }
867    }
868
869    pub fn filter<F>(self, predicate: F) -> Self
870    where
871        F: Fn(&K, &V) -> bool + Send + Sync + Clone + 'static,
872    {
873        Self {
874            inner: self.inner.filter(move |(k, v)| predicate(k, v)),
875        }
876    }
877}
878
879/// KTable-like wrapper for changelog streams and materialized views
880pub struct KTableCompat<K, V> {
881    store: StateStore<K, V>,
882}
883
884impl<K, V> KTableCompat<K, V>
885where
886    K: Clone + Send + Sync + 'static,
887    V: Clone + Send + Sync + 'static,
888{
889    pub fn new(store: StateStore<K, V>) -> Self {
890        Self { store }
891    }
892
893    pub fn map_values<F, V2>(self, f: F) -> KTableCompat<K, V2>
894    where
895        F: Fn(V) -> V2 + Send + Sync + Clone + 'static,
896        V2: Clone + Send + Sync + 'static,
897    {
898        KTableCompat {
899            store: self.store.map_values(f),
900        }
901    }
902
903    pub fn filter<F>(self, predicate: F) -> Self
904    where
905        F: Fn(&K, &V) -> bool + Send + Sync + Clone + 'static,
906    {
907        Self {
908            store: self.store.filter(predicate),
909        }
910    }
911
912    pub fn group_by<F, K2>(self, key_selector: F) -> KTableCompat<K2, Vec<V>>
913    where
914        K2: Clone + Send + Sync + 'static,
915        F: Fn(&K, &V) -> K2 + Send + Sync + Clone + 'static,
916    {
917        KTableCompat {
918            store: self.store.group_by(key_selector),
919        }
920    }
921}
922"#
923            .to_string(),
924            SourcePlatform::Flink => r#"//! Compatibility wrappers for Flink API
925
926use oxirs_stream::prelude::*;
927
928/// DataStream-like wrapper
929pub struct DataStreamCompat<T> {
930    inner: Stream<T>,
931}
932
933impl<T> DataStreamCompat<T>
934where
935    T: Clone + Send + Sync + 'static,
936{
937    pub fn new(stream: Stream<T>) -> Self {
938        Self { inner: stream }
939    }
940
941    pub fn key_by<K, F>(self, key_selector: F) -> KeyedStreamCompat<K, T>
942    where
943        K: Clone + Send + Sync + 'static,
944        F: Fn(&T) -> K + Send + Sync + Clone + 'static,
945    {
946        KeyedStreamCompat {
947            inner: self.inner.key_by(key_selector),
948        }
949    }
950}
951
952/// KeyedStream-like wrapper
953pub struct KeyedStreamCompat<K, T> {
954    inner: GroupedStream<K, T>,
955}
956"#
957            .to_string(),
958            _ => r#"//! Generic compatibility wrappers
959
960use oxirs_stream::prelude::*;
961
962// Add platform-specific wrappers as needed
963"#
964            .to_string(),
965        }
966    }
967
968    async fn generate_tests(&self) -> String {
969        r#"//! Integration tests for migrated application
970
971use oxirs_stream::prelude::*;
972
973#[tokio::test]
974async fn test_basic_stream() {
975    // Add your tests here
976    assert!(true);
977}
978
979#[tokio::test]
980async fn test_window_operations() {
981    // Test window operations
982    assert!(true);
983}
984
985#[tokio::test]
986async fn test_aggregations() {
987    // Test aggregations
988    assert!(true);
989}
990"#
991        .to_string()
992    }
993
994    fn platform_name(&self) -> String {
995        match &self.config.source_platform {
996            SourcePlatform::KafkaStreams => "Kafka Streams".to_string(),
997            SourcePlatform::Flink => "Apache Flink".to_string(),
998            SourcePlatform::SparkStreaming => "Spark Streaming".to_string(),
999            SourcePlatform::Storm => "Apache Storm".to_string(),
1000            SourcePlatform::PulsarFunctions => "Pulsar Functions".to_string(),
1001            SourcePlatform::KinesisAnalytics => "Kinesis Analytics".to_string(),
1002            SourcePlatform::Dataflow => "Google Dataflow".to_string(),
1003            SourcePlatform::Beam => "Apache Beam".to_string(),
1004            SourcePlatform::Custom(name) => name.clone(),
1005        }
1006    }
1007}
1008
1009/// Helper struct for file analysis
1010struct FileAnalysis {
1011    lines: usize,
1012    warnings: Vec<MigrationWarning>,
1013    review_items: Vec<ManualReviewItem>,
1014}
1015
1016/// Quick start helper for common migrations
1017pub struct QuickStart;
1018
1019impl QuickStart {
1020    /// Create a Kafka Streams migration tool
1021    pub fn from_kafka_streams(source_dir: &str, output_dir: &str) -> MigrationTool {
1022        MigrationTool::new(MigrationConfig {
1023            source_platform: SourcePlatform::KafkaStreams,
1024            source_dir: PathBuf::from(source_dir),
1025            output_dir: PathBuf::from(output_dir),
1026            ..Default::default()
1027        })
1028    }
1029
1030    /// Create a Flink migration tool
1031    pub fn from_flink(source_dir: &str, output_dir: &str) -> MigrationTool {
1032        MigrationTool::new(MigrationConfig {
1033            source_platform: SourcePlatform::Flink,
1034            source_dir: PathBuf::from(source_dir),
1035            output_dir: PathBuf::from(output_dir),
1036            ..Default::default()
1037        })
1038    }
1039
1040    /// Create a Spark Streaming migration tool
1041    pub fn from_spark(source_dir: &str, output_dir: &str) -> MigrationTool {
1042        MigrationTool::new(MigrationConfig {
1043            source_platform: SourcePlatform::SparkStreaming,
1044            source_dir: PathBuf::from(source_dir),
1045            output_dir: PathBuf::from(output_dir),
1046            ..Default::default()
1047        })
1048    }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053    use super::*;
1054
1055    /// Collision-safe temp "source" path for migration tests (no FS access).
1056    fn mig_src() -> String {
1057        std::env::temp_dir()
1058            .join(format!("oxirs_mig_src_{}", std::process::id()))
1059            .display()
1060            .to_string()
1061    }
1062
1063    /// Collision-safe temp "output" path for migration tests (no FS access).
1064    fn mig_out() -> String {
1065        std::env::temp_dir()
1066            .join(format!("oxirs_mig_out_{}", std::process::id()))
1067            .display()
1068            .to_string()
1069    }
1070
1071    #[tokio::test]
1072    async fn test_migration_tool_creation() {
1073        let config = MigrationConfig::default();
1074        let mut tool = MigrationTool::new(config);
1075        tool.load_default_mappings().await;
1076
1077        let mappings = tool.get_concept_mappings().await;
1078        assert!(!mappings.is_empty());
1079    }
1080
1081    #[tokio::test]
1082    async fn test_kafka_streams_mappings() {
1083        let mut tool = QuickStart::from_kafka_streams(mig_src().as_str(), mig_out().as_str());
1084        tool.load_default_mappings().await;
1085
1086        let concept_mappings = tool.get_concept_mappings().await;
1087        let has_kstream = concept_mappings.iter().any(|m| m.source_name == "KStream");
1088        assert!(has_kstream);
1089    }
1090
1091    #[tokio::test]
1092    async fn test_flink_mappings() {
1093        let mut tool = QuickStart::from_flink(mig_src().as_str(), mig_out().as_str());
1094        tool.load_default_mappings().await; // Load mappings
1095
1096        let concept_mappings = tool.get_concept_mappings().await;
1097        let has_datastream = concept_mappings
1098            .iter()
1099            .any(|m| m.source_name == "DataStream");
1100        assert!(has_datastream);
1101    }
1102
1103    #[tokio::test]
1104    async fn test_custom_mapping() {
1105        let config = MigrationConfig::default();
1106        let tool = MigrationTool::new(config);
1107
1108        tool.add_concept_mapping(ConceptMapping {
1109            source_name: "CustomConcept".to_string(),
1110            target_name: "OxirsConcept".to_string(),
1111            description: "Custom mapping".to_string(),
1112            pattern: None,
1113            source_example: None,
1114            target_example: None,
1115        })
1116        .await;
1117
1118        let mappings = tool.get_concept_mappings().await;
1119        let has_custom = mappings.iter().any(|m| m.source_name == "CustomConcept");
1120        assert!(has_custom);
1121    }
1122
1123    #[tokio::test]
1124    async fn test_generate_guide() {
1125        let tool = QuickStart::from_kafka_streams(mig_src().as_str(), mig_out().as_str());
1126
1127        let guide = tool.generate_guide().await;
1128        assert!(guide.contains("Migration Guide"));
1129        assert!(guide.contains("Kafka Streams"));
1130    }
1131
1132    #[tokio::test]
1133    async fn test_analyze_empty_directory() {
1134        let config = MigrationConfig {
1135            source_dir: std::env::temp_dir()
1136                .join(format!("oxirs_mig_nonexistent_{}", std::process::id())),
1137            output_dir: std::env::temp_dir().join(format!("oxirs_mig_out_{}", std::process::id())),
1138            ..Default::default()
1139        };
1140
1141        let tool = MigrationTool::new(config);
1142        let report = tool.analyze().await.unwrap();
1143
1144        assert_eq!(report.files_processed, 0);
1145        assert_eq!(report.compatibility_score, 100.0);
1146    }
1147
1148    #[tokio::test]
1149    async fn test_api_mappings() {
1150        let mut tool = QuickStart::from_kafka_streams(mig_src().as_str(), mig_out().as_str());
1151        tool.load_default_mappings().await; // Load mappings
1152
1153        let api_mappings = tool.get_api_mappings().await;
1154        let has_filter = api_mappings.iter().any(|m| m.source_api.contains("filter"));
1155        assert!(has_filter);
1156    }
1157
1158    #[tokio::test]
1159    async fn test_compatibility_score() {
1160        let config = MigrationConfig::default();
1161        let tool = MigrationTool::new(config);
1162
1163        // Empty directory should have 100% compatibility
1164        let report = tool.analyze().await.unwrap();
1165        assert!(report.compatibility_score >= 0.0 && report.compatibility_score <= 100.0);
1166    }
1167
1168    #[tokio::test]
1169    async fn test_spark_mappings() {
1170        let mut tool = QuickStart::from_spark(mig_src().as_str(), mig_out().as_str());
1171        tool.load_default_mappings().await;
1172
1173        let mappings = tool.get_concept_mappings().await;
1174        let has_dstream = mappings.iter().any(|m| m.source_name == "DStream");
1175        assert!(has_dstream);
1176    }
1177
1178    #[tokio::test]
1179    async fn test_migration_stats() {
1180        let config = MigrationConfig::default();
1181        let tool = MigrationTool::new(config);
1182
1183        let stats = tool.get_stats().await;
1184        assert_eq!(stats.total_migrations, 0);
1185    }
1186
1187    #[tokio::test]
1188    async fn test_kafka_wrapper_has_no_todo_stubs() {
1189        let tool = MigrationTool::new(MigrationConfig {
1190            source_platform: SourcePlatform::KafkaStreams,
1191            ..Default::default()
1192        });
1193        let wrapper = tool.generate_compatibility_wrapper().await;
1194        assert!(
1195            !wrapper.contains("todo!()"),
1196            "Kafka Streams compatibility wrapper must not contain todo!() stubs; \
1197             found unimplemented body in generated template"
1198        );
1199        assert!(
1200            wrapper.contains("KStreamCompat"),
1201            "wrapper must include KStreamCompat"
1202        );
1203        assert!(
1204            wrapper.contains("KTableCompat"),
1205            "wrapper must include KTableCompat"
1206        );
1207        assert!(
1208            wrapper.contains("map_values"),
1209            "wrapper must include map_values method"
1210        );
1211        assert!(
1212            wrapper.contains("filter"),
1213            "wrapper must include filter method"
1214        );
1215        assert!(
1216            wrapper.contains("group_by"),
1217            "wrapper must include group_by method"
1218        );
1219    }
1220
1221    #[tokio::test]
1222    async fn test_flink_wrapper_has_no_todo_stubs() {
1223        let tool = MigrationTool::new(MigrationConfig {
1224            source_platform: SourcePlatform::Flink,
1225            ..Default::default()
1226        });
1227        let wrapper = tool.generate_compatibility_wrapper().await;
1228        assert!(
1229            !wrapper.contains("todo!()"),
1230            "Flink compatibility wrapper must not contain todo!() stubs; \
1231             found unimplemented body in generated template"
1232        );
1233        assert!(
1234            wrapper.contains("DataStreamCompat"),
1235            "wrapper must include DataStreamCompat"
1236        );
1237        assert!(
1238            wrapper.contains("key_by"),
1239            "wrapper must include key_by method"
1240        );
1241        assert!(
1242            wrapper.contains("KeyedStreamCompat"),
1243            "wrapper must include KeyedStreamCompat"
1244        );
1245    }
1246}