use fortress_core::prelude::*;
use std::collections::HashMap;
use tokio;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
println!("Fortress Database Synchronization Example");
println!("==========================================");
let config = PushPullConfig {
max_batch_size: 100,
max_concurrent_ops: 5,
timeout_seconds: 60,
enable_compression: true,
conflict_resolution: ConflictResolution::Timestamp,
enable_incremental: true,
verify_checksums: true,
enable_progress: true,
};
let manager = PushPullManager::new(config);
let mongo_config = MongoConfig {
connection_string: "mongodb://localhost:27017".to_string(),
database_name: "fortress_source".to_string(),
keys_collection: "fortress_keys".to_string(),
data_collection: "fortress_data".to_string(),
max_pool_size: 10,
tls_enabled: false,
auth_database: None,
replica_set: None,
read_preference: MongoReadPreference::Primary,
write_concern: MongoWriteConcern::Acknowledged,
};
let postgres_config = PostgresConfig {
connection_string: "postgresql://localhost:5432/fortress_target".to_string(),
database_name: "fortress_target".to_string(),
schema: "public".to_string(),
keys_table: "fortress_keys".to_string(),
data_table: "fortress_data".to_string(),
max_connections: 20,
connection_timeout_seconds: 30,
ssl_enabled: false,
log_statements: false,
enable_pooling: true,
partitioning: Some(PostgresPartitioning::ByDate {
column: "created_at".to_string(),
interval: "1 day".to_string()
}),
replication: PostgresReplicationConfig {
streaming_enabled: false,
slot_name: None,
publication_name: None,
sync_mode: PostgresSyncMode::Asynchronous,
},
};
println!("Initializing databases...");
let mongo_db = MongoKeyDatabase::new(mongo_config.clone()).await?;
mongo_db.initialize().await?;
println!("MongoDB initialized");
let postgres_db = PostgresKeyDatabase::new(postgres_config.clone()).await?;
postgres_db.initialize().await?;
println!("PostgreSQL initialized");
println!("Adding sample data to MongoDB...");
add_sample_data_to_mongo(&mongo_db).await?;
println!("Pushing data from MongoDB to PostgreSQL...");
let push_result = push_mongo_to_postgres(&manager, &mongo_config, &postgres_config).await?;
print_operation_result(&push_result);
println!("🔄 Pulling data from PostgreSQL to MongoDB...");
let pull_result = pull_postgres_to_mongo(&manager, &postgres_config, &mongo_config).await?;
print_operation_result(&pull_result);
println!("🔍 Demonstrating advanced MongoDB operations...");
demonstrate_mongo_features(&mongo_db).await?;
println!("🔍 Demonstrating advanced PostgreSQL operations...");
demonstrate_postgres_features(&postgres_db).await?;
println!("Demonstrating bidirectional synchronization...");
demonstrate_bidirectional_sync(&manager, &mongo_config, &postgres_config).await?;
println!("Database synchronization example completed successfully!");
Ok(())
}
async fn add_sample_data_to_mongo(mongo_db: &MongoKeyDatabase) -> Result<()> {
let mut sample_data = Vec::new();
for i in 1..=50 {
let mut metadata = HashMap::new();
metadata.insert("type".to_string(), if i % 2 == 0 { "document" } else { "binary" }.to_string());
metadata.insert("category".to_string(), format!("category_{}", i % 5));
metadata.insert("priority".to_string(), if i % 3 == 0 { "high" } else { "normal" }.to_string());
let data = if i % 2 == 0 {
format!("Document content for item {}", i).into_bytes()
} else {
(0..100).map(|j| (i * j) as u8).collect::<Vec<u8>>()
};
sample_data.push((format!("item_{}", i), data, metadata));
}
let count = mongo_db.push_bulk(sample_data).await?;
println!("📈 Added {} sample items to MongoDB", count);
Ok(())
}
async fn push_mongo_to_postgres(
manager: &PushPullManager,
mongo_config: &MongoConfig,
postgres_config: &PostgresConfig,
) -> Result<PushPullResult> {
let push_request = PushRequest {
operation_id: format!("mongo_to_postgres_{}", uuid::Uuid::new_v4()),
source: StorageSource::Mongo { config: mongo_config.clone() },
target: StorageTarget::Postgres { config: postgres_config.clone() },
filter: PushFilter::All,
config: PushPullConfig::default(),
started_at: chrono::Utc::now(),
};
let result = manager.push(push_request).await?;
Ok(result)
}
async fn pull_postgres_to_mongo(
manager: &PushPullManager,
postgres_config: &PostgresConfig,
mongo_config: &MongoConfig,
) -> Result<PushPullResult> {
let pull_request = PullRequest {
operation_id: format!("postgres_to_mongo_{}", uuid::Uuid::new_v4()),
source: StorageSource::Postgres { config: postgres_config.clone() },
target: StorageTarget::Mongo { config: mongo_config.clone() },
filter: PullFilter::DateRange {
start: chrono::Utc::now() - chrono::Duration::hours(1),
end: chrono::Utc::now(),
},
config: PushPullConfig::default(),
started_at: chrono::Utc::now(),
};
let result = manager.pull(pull_request).await?;
Ok(result)
}
fn print_operation_result(result: &PushPullResult) {
println!("Operation Result:");
println!(" Type: {:?}", result.operation_type);
println!(" Success: {}", result.success);
println!(" Items Processed: {}", result.items_processed);
println!(" Items Succeeded: {}", result.items_succeeded);
println!(" Items Failed: {}", result.items_failed);
println!(" Bytes Transferred: {}", result.bytes_transferred);
if let Some(duration) = result.duration_seconds {
println!(" Duration: {} seconds", duration);
}
if let Some(ref error) = result.error_message {
println!(" Error: {}", error);
}
if !result.progress_updates.is_empty() {
println!(" Progress Updates: {}", result.progress_updates.len());
}
println!();
}
async fn demonstrate_mongo_features(mongo_db: &MongoKeyDatabase) -> Result<()> {
println!(" Testing MongoDB aggregation...");
let pipeline = MongoPipeline {
operation: "count_by_type".to_string(),
stages: vec![],
};
let aggregation_results = mongo_db.aggregate(pipeline).await?;
println!(" Aggregation results: {:?}", aggregation_results);
println!(" 🔍 Testing MongoDB text search...");
let search_results = mongo_db.text_search("document", Some(5)).await?;
println!(" Search results: {} items found", search_results.len());
println!(" 🎯 Testing filtered pull...");
let filtered_results = mongo_db.pull_filtered(MongoPullFilter::Prefix("item_1".to_string())).await?;
println!(" Filtered results: {} items", filtered_results.len());
Ok(())
}
async fn demonstrate_postgres_features(postgres_db: &PostgresKeyDatabase) -> Result<()> {
println!(" Testing PostgreSQL cursor query...");
let query = PostgresQuery {
key_filter: Some("item_".to_string()),
date_start: None,
date_end: None,
min_size: None,
max_size: None,
content_type: None,
offset: Some(0),
limit: Some(10),
};
let cursor = postgres_db.pull_cursor(query).await?;
println!(" Cursor results: {} items", cursor.results.len());
println!(" 🔍 Testing PostgreSQL full-text search...");
let search_results = postgres_db.full_text_search("content", Some(5)).await?;
println!(" Search results: {} items found", search_results.len());
println!(" 🎯 Testing PostgreSQL JSONB query...");
let jsonb_query = PostgresJsonbQuery::Equals {
path: "priority",
value: serde_json::Value::String("high".to_string()),
};
let jsonb_results = postgres_db.jsonb_query(jsonb_query).await?;
println!(" JSONB query results: {} items", jsonb_results.len());
println!(" 📦 Testing PostgreSQL bulk operations...");
let bulk_entries = vec![
PostgresBulkEntry {
key: "bulk_test_1".to_string(),
data: b"Bulk test data 1".to_vec(),
metadata: {
let mut meta = HashMap::new();
meta.insert("bulk".to_string(), "true".to_string());
meta
},
content_type: "text/plain".to_string(),
encoding: "utf-8".to_string(),
compression: "none".to_string(),
partition_key: Some("test_partition".to_string()),
},
PostgresBulkEntry {
key: "bulk_test_2".to_string(),
data: b"Bulk test data 2".to_vec(),
metadata: {
let mut meta = HashMap::new();
meta.insert("bulk".to_string(), "true".to_string());
meta
},
content_type: "text/plain".to_string(),
encoding: "utf-8".to_string(),
compression: "none".to_string(),
partition_key: Some("test_partition".to_string()),
},
];
let bulk_count = postgres_db.push_bulk_copy(bulk_entries).await?;
println!(" Bulk operations: {} items processed", bulk_count);
Ok(())
}
async fn demonstrate_bidirectional_sync(
manager: &PushPullManager,
mongo_config: &MongoConfig,
postgres_config: &PostgresConfig,
) -> Result<()> {
let mongo_db = MongoKeyDatabase::new(mongo_config.clone()).await?;
let new_data = vec![
("sync_test_1".to_string(), b"Sync test data 1".to_vec(), HashMap::new()),
("sync_test_2".to_string(), b"Sync test data 2".to_vec(), HashMap::new()),
];
mongo_db.push_bulk(new_data).await?;
println!(" Added new test data to MongoDB");
let push_result = push_mongo_to_postgres(manager, mongo_config, postgres_config).await?;
println!(" 📤 Push result: {} items processed", push_result.items_processed);
let postgres_db = PostgresKeyDatabase::new(postgres_config.clone()).await?;
let postgres_bulk = vec![
PostgresBulkEntry {
key: "sync_test_3".to_string(),
data: b"Sync test data 3".to_vec(),
metadata: HashMap::new(),
content_type: "text/plain".to_string(),
encoding: "utf-8".to_string(),
compression: "none".to_string(),
partition_key: None,
},
];
postgres_db.push_bulk_copy(postgres_bulk).await?;
println!(" Added new test data to PostgreSQL");
let pull_result = pull_postgres_to_mongo(manager, postgres_config, mongo_config).await?;
println!(" 📥 Pull result: {} items processed", pull_result.items_processed);
let final_mongo_data = mongo_db.pull_filtered(MongoPullFilter::Prefix("sync_test".to_string())).await?;
let final_postgres_query = PostgresQuery {
key_filter: Some("sync_test".to_string()),
date_start: None,
date_end: None,
min_size: None,
max_size: None,
content_type: None,
offset: None,
limit: None,
};
let final_postgres_data = postgres_db.pull_cursor(final_postgres_query).await?;
println!(" Final verification:");
println!(" MongoDB items: {}", final_mongo_data.len());
println!(" PostgreSQL items: {}", final_postgres_data.results.len());
Ok(())
}