use super::error::{ConductorError, ConductorResult};
use crate::conductor::state::{AppInterfaceConfig, AppInterfaceId};
use crate::conductor::state::{ConductorState, ConductorStateTag};
use holochain_conductor_api::signal_subscription::SignalSubscription;
use holochain_state::conductor::ConductorStateSnapshot;
use holochain_types::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
pub fn snapshot_to_state(snapshot: ConductorStateSnapshot) -> ConductorResult<ConductorState> {
let tag = ConductorStateTag(Arc::from(snapshot.tag.as_str()));
let mut installed_apps = InstalledAppMap::new();
for (app_id, app_common, status) in snapshot.installed_apps {
let mut installed_app = InstalledApp::new_fresh(app_common);
installed_app.status = status;
installed_apps.insert(InstalledAppId::from(app_id), installed_app);
}
let mut app_interfaces = HashMap::new();
for (model, subs_data) in snapshot.app_interfaces {
let driver = model.to_driver().map_err(ConductorError::other)?;
let mut signal_subscriptions = HashMap::new();
for (app_id, filters_blob) in subs_data {
let subscription: SignalSubscription =
serde_json::from_slice(&filters_blob).map_err(|e| {
ConductorError::other(format!(
"Failed to deserialize signal subscription: {}",
e
))
})?;
signal_subscriptions.insert(InstalledAppId::from(app_id), subscription);
}
let config = AppInterfaceConfig {
signal_subscriptions,
installed_app_id: model.installed_app_id.clone(),
driver,
};
let interface_id = if model.port == 0 {
if model.id.is_empty() {
return Err(ConductorError::other("Port 0 interface missing ID"));
}
AppInterfaceId::from_parts(0, Some(model.id.clone()))
} else {
AppInterfaceId::new(model.port as u16)
};
app_interfaces.insert(interface_id, config);
}
Ok(ConductorState::from_parts(
tag,
installed_apps,
app_interfaces,
))
}
pub fn state_to_snapshot(state: &ConductorState) -> ConductorResult<ConductorStateSnapshot> {
let tag = state.tag().0.as_ref().to_string();
let installed_apps = state
.installed_apps()
.iter()
.map(|(app_id, installed_app)| {
(
app_id.to_string(),
installed_app.as_ref().clone(),
installed_app.status.clone(),
)
})
.collect();
let mut app_interfaces = Vec::with_capacity(state.app_interfaces.len());
for (interface_id, config) in &state.app_interfaces {
let mut model = holochain_data::conductor::AppInterfaceModel::from_driver(
&config.driver,
config.installed_app_id.as_ref().map(|id| id.to_string()),
)
.map_err(ConductorError::other)?;
let mut subscriptions = Vec::with_capacity(config.signal_subscriptions.len());
for (app_id, subscription) in &config.signal_subscriptions {
let filters_blob = serde_json::to_vec(subscription).map_err(|e| {
ConductorError::other(format!("Failed to serialize signal subscription: {}", e))
})?;
subscriptions.push((app_id.to_string(), filters_blob));
}
model.port = interface_id.port() as i64;
model.id = interface_id.id().as_deref().unwrap_or("").to_string();
app_interfaces.push((model, subscriptions));
}
Ok(ConductorStateSnapshot {
tag,
installed_apps,
app_interfaces,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::conductor::state::AppInterfaceConfig;
use holochain_state::conductor::ConductorStore;
use holochain_types::websocket::AllowedOrigins;
async fn state_to_store(
store: &ConductorStore,
state: ConductorState,
) -> ConductorResult<ConductorState> {
let snapshot = state_to_snapshot(&state)?;
store
.update_state(|_| -> ConductorResult<_> { Ok((snapshot, ())) })
.await?;
let loaded_snapshot = store.as_read().load_state().await?.unwrap();
let loaded = snapshot_to_state(loaded_snapshot)?;
Ok(loaded)
}
#[tokio::test]
async fn state_persistence_round_trip() {
let store = ConductorStore::new_test().await.unwrap();
let tag = ConductorStateTag(Arc::from("test-conductor"));
let state = ConductorState::from_parts(tag, InstalledAppMap::new(), HashMap::new());
let loaded = state_to_store(&store, state).await.unwrap();
assert_eq!(loaded.tag().0.as_ref(), "test-conductor");
assert_eq!(loaded.installed_apps().len(), 0);
assert_eq!(loaded.app_interfaces.len(), 0);
}
#[tokio::test]
async fn app_interface_persistence() {
let store = ConductorStore::new_test().await.unwrap();
let tag = ConductorStateTag(Arc::from("test-conductor"));
let mut app_interfaces = HashMap::new();
let interface_config =
AppInterfaceConfig::websocket(12345, None, AllowedOrigins::Any, None);
let interface_id = AppInterfaceId::new(12345);
app_interfaces.insert(interface_id, interface_config);
let state = ConductorState::from_parts(tag, InstalledAppMap::new(), app_interfaces);
let loaded = state_to_store(&store, state).await.unwrap();
assert_eq!(loaded.app_interfaces.len(), 1);
let loaded_interface = loaded.app_interfaces.values().next().unwrap();
assert_eq!(loaded_interface.driver.port(), 12345);
}
#[tokio::test]
async fn deletion_of_stale_interfaces() {
let store = ConductorStore::new_test().await.unwrap();
let tag = ConductorStateTag(Arc::from("test-conductor"));
let mut app_interfaces = HashMap::new();
app_interfaces.insert(
AppInterfaceId::new(12345),
AppInterfaceConfig::websocket(12345, None, AllowedOrigins::Any, None),
);
app_interfaces.insert(
AppInterfaceId::new(12346),
AppInterfaceConfig::websocket(12346, None, AllowedOrigins::Any, None),
);
let state = ConductorState::from_parts(tag.clone(), InstalledAppMap::new(), app_interfaces);
let _ = state_to_store(&store, state).await.unwrap();
let mut app_interfaces = HashMap::new();
app_interfaces.insert(
AppInterfaceId::new(12345),
AppInterfaceConfig::websocket(12345, None, AllowedOrigins::Any, None),
);
let new_state = ConductorState::from_parts(tag, InstalledAppMap::new(), app_interfaces);
let loaded = state_to_store(&store, new_state).await.unwrap();
assert_eq!(loaded.app_interfaces.len(), 1);
assert!(loaded
.app_interfaces
.contains_key(&AppInterfaceId::new(12345)));
}
}