use crate::sync::DataSyncBackend;
use crate::sync::{BackendConfig, TransportConfig};
use crate::{Error, Result};
use std::collections::HashMap;
use std::sync::Arc;
#[cfg(feature = "automerge-backend")]
use std::time::Duration;
#[cfg(feature = "automerge-backend")]
#[allow(unused_imports)]
use tracing::{debug, info};
#[cfg(feature = "automerge-backend")]
use crate::network::IrohTransport;
#[cfg(feature = "automerge-backend")]
use crate::storage::AutomergeStore;
#[cfg(feature = "automerge-backend")]
use crate::sync::automerge::AutomergeIrohBackend;
pub struct E2EHarness {
pub name: String,
temp_dirs: Vec<tempfile::TempDir>,
#[cfg(feature = "automerge-backend")]
test_secret: String,
}
impl E2EHarness {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
temp_dirs: Vec::new(),
#[cfg(feature = "automerge-backend")]
test_secret: crate::security::FormationKey::generate_secret(),
}
}
pub fn allocate_tcp_port() -> std::io::Result<u16> {
use std::net::{SocketAddr, TcpListener};
let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))?;
let port = listener.local_addr()?.port();
drop(listener);
Ok(port)
}
#[cfg(feature = "automerge-backend")]
pub async fn create_automerge_backend(&mut self) -> Result<Arc<AutomergeIrohBackend>> {
self.create_automerge_backend_with_bind(None).await
}
#[cfg(feature = "automerge-backend")]
pub async fn create_automerge_backend_with_bind(
&mut self,
bind_addr: Option<std::net::SocketAddr>,
) -> Result<Arc<AutomergeIrohBackend>> {
let temp_dir = tempfile::tempdir().map_err(|e| {
Error::storage_error(
format!("Failed to create temp dir: {}", e),
"test_setup",
None,
)
})?;
let store = Arc::new(AutomergeStore::open(temp_dir.path()).map_err(|e| {
Error::storage_error(
format!("Failed to create AutomergeStore: {}", e),
"test_setup",
None,
)
})?);
let transport = if let Some(addr) = bind_addr {
Arc::new(IrohTransport::bind(addr).await.map_err(|e| {
Error::network_error(format!("Failed to bind Iroh transport: {}", e), None)
})?)
} else {
Arc::new(IrohTransport::new().await.map_err(|e| {
Error::network_error(format!("Failed to create Iroh transport: {}", e), None)
})?)
};
let backend = Arc::new(AutomergeIrohBackend::from_parts(store, transport));
let config = BackendConfig {
app_id: "automerge-test".to_string(),
persistence_dir: temp_dir.path().to_path_buf(),
shared_key: Some(self.test_secret.clone()),
transport: TransportConfig::default(),
extra: HashMap::new(),
};
backend.initialize(config).await?;
self.temp_dirs.push(temp_dir);
Ok(backend)
}
#[cfg(feature = "automerge-backend")]
pub async fn connect_backends_now(
&self,
backend_a: &Arc<AutomergeIrohBackend>,
backend_b: &Arc<AutomergeIrohBackend>,
) -> Result<()> {
tokio::time::sleep(Duration::from_millis(50)).await;
let (result_a, result_b) = tokio::join!(
backend_a.connect_to_discovered_peers_now(),
backend_b.connect_to_discovered_peers_now()
);
let count_a = result_a.unwrap_or(0);
let count_b = result_b.unwrap_or(0);
if count_a > 0 || count_b > 0 {
debug!(
"Fast connect: A made {} new, B made {} new",
count_a, count_b
);
return Ok(());
}
for i in 0..5 {
tokio::time::sleep(Duration::from_millis(100)).await;
let (result_a, result_b) = tokio::join!(
backend_a.connect_to_discovered_peers_now(),
backend_b.connect_to_discovered_peers_now()
);
if result_a.unwrap_or(0) > 0 || result_b.unwrap_or(0) > 0 {
debug!("Fast connect succeeded on retry {}", i + 1);
return Ok(());
}
let transport_a = backend_a.transport();
let transport_b = backend_b.transport();
if !transport_a.connected_peers().is_empty()
|| !transport_b.connected_peers().is_empty()
{
debug!("Already connected on retry {}", i + 1);
return Ok(());
}
}
Err(Error::network_error(
"Failed to establish connection between backends after retries",
None,
))
}
#[cfg(feature = "automerge-backend")]
pub async fn wait_for_automerge_connection(
&self,
backend_a: &Arc<AutomergeIrohBackend>,
backend_b: &Arc<AutomergeIrohBackend>,
timeout_duration: Duration,
) -> Result<()> {
let start = std::time::Instant::now();
while start.elapsed() < timeout_duration {
let _ = backend_a.connect_to_discovered_peers_now().await;
let _ = backend_b.connect_to_discovered_peers_now().await;
let transport_a = backend_a.transport();
let transport_b = backend_b.transport();
if !transport_a.connected_peers().is_empty()
|| !transport_b.connected_peers().is_empty()
{
info!("Connection established in {:?}", start.elapsed());
return Ok(());
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
Err(Error::network_error(
format!("Connection timeout after {:?}", timeout_duration),
None,
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_harness_creation() {
let harness = E2EHarness::new("test_scenario");
assert_eq!(harness.name, "test_scenario");
assert_eq!(harness.temp_dirs.len(), 0);
}
}