use std::sync::Arc;
use narwhal_config::{DynCredentialStore, VaultRegistry};
use narwhal_core::{ConnectionConfig, DynDatabaseDriver, TableSchema};
use narwhal_domain::SchemaListing;
use narwhal_history::HistoryEntry;
use secrecy::{ExposeSecret, SecretString};
use uuid::Uuid;
use crate::session::{Session, SessionOpenOptions};
#[non_exhaustive]
pub enum MetaRequest {
DumpSchemaAll {
tab_id: u64,
},
RefreshSchemas {
session_id: Uuid,
},
LoadHistory {
limit: usize,
},
OpenSession {
driver: Arc<dyn DynDatabaseDriver>,
config: Box<ConnectionConfig>,
password_hint: Option<String>,
opts: SessionOpenOptions,
},
TestConnection {
driver: Arc<dyn DynDatabaseDriver>,
config: Box<ConnectionConfig>,
password: Option<String>,
opts: SessionOpenOptions,
label: String,
},
}
impl std::fmt::Debug for MetaRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DumpSchemaAll { tab_id } => f
.debug_struct("DumpSchemaAll")
.field("tab_id", tab_id)
.finish(),
Self::RefreshSchemas { session_id } => f
.debug_struct("RefreshSchemas")
.field("session_id", session_id)
.finish(),
Self::LoadHistory { limit } => {
f.debug_struct("LoadHistory").field("limit", limit).finish()
}
Self::OpenSession { config, opts, .. } => f
.debug_struct("OpenSession")
.field("config.name", &config.name)
.field("opts", opts)
.finish(),
Self::TestConnection { label, opts, .. } => f
.debug_struct("TestConnection")
.field("label", label)
.field("opts", opts)
.finish(),
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum MetaUpdate {
DumpSchemaReady {
tab_id: u64,
tables: Vec<TableSchema>,
},
SchemasRefreshed {
session_id: Uuid,
schemas: Vec<SchemaListing>,
},
HistoryReady {
entries: Vec<HistoryEntry>,
},
SessionOpened {
config_id: Uuid,
result: Result<Box<Session>, String>,
},
MetaFailed {
message: String,
},
TestCompleted {
label: String,
result: Result<String, String>,
},
CredentialReady {
connection_id: Uuid,
password: Option<SecretString>,
},
ForgetCompleted {
name: String,
result: Result<(), String>,
},
InjectDdlReady {
tab_id: u64,
schema: String,
name: String,
ddl: String,
},
}
pub fn spawn_meta_request(
request: MetaRequest,
pool: Option<narwhal_pool::Pool>,
history: Option<Arc<narwhal_history::Journal>>,
credentials: Option<Arc<dyn DynCredentialStore>>,
vault: Option<Arc<VaultRegistry>>,
tx: tokio::sync::mpsc::Sender<MetaUpdate>,
) {
tokio::spawn(async move {
let update = match request {
MetaRequest::DumpSchemaAll { tab_id } => {
let Some(pool) = pool else {
let _ = tx
.send(MetaUpdate::MetaFailed {
message: "no active connection".into(),
})
.await;
return;
};
match dump_schema_all(&pool).await {
Ok(tables) => MetaUpdate::DumpSchemaReady { tab_id, tables },
Err(e) => MetaUpdate::MetaFailed {
message: format!("dump-schema failed: {e}"),
},
}
}
MetaRequest::RefreshSchemas { session_id } => {
let Some(pool) = pool else {
let _ = tx
.send(MetaUpdate::MetaFailed {
message: "no active connection".into(),
})
.await;
return;
};
match refresh_schemas_via_pool(&pool).await {
Ok(schemas) => MetaUpdate::SchemasRefreshed {
session_id,
schemas,
},
Err(e) => MetaUpdate::MetaFailed {
message: format!("refresh failed: {e}"),
},
}
}
MetaRequest::OpenSession {
driver,
config,
password_hint,
opts,
} => {
let config_id = config.id;
let password = match password_hint {
Some(p) => Some(p),
None => {
resolve_password(credentials.as_deref(), vault.as_deref(), &config).await
}
};
let result = match Session::open_with(
Arc::clone(&driver),
(*config).clone(),
password,
opts,
)
.await
{
Ok(mut session) => {
if let Err(error) = session.refresh_schemas().await {
tracing::debug!(
target: "narwhal::meta",
error = %error,
"initial schema refresh failed after open; continuing"
);
}
Ok(Box::new(session))
}
Err(error) => Err(error.to_string()),
};
MetaUpdate::SessionOpened { config_id, result }
}
MetaRequest::TestConnection {
driver,
config,
password,
opts,
label,
} => {
let resolved = match password {
Some(p) => Some(p),
None => {
resolve_password(credentials.as_deref(), vault.as_deref(), &config).await
}
};
let result = match Session::open_with(
Arc::clone(&driver),
(*config).clone(),
resolved,
opts,
)
.await
{
Ok(session) => {
let driver_name = session.driver.name().to_owned();
drop(session);
Ok(driver_name)
}
Err(e) => Err(e.to_string()),
};
MetaUpdate::TestCompleted { label, result }
}
MetaRequest::LoadHistory { limit } => {
let Some(journal) = history else {
let _ = tx
.send(MetaUpdate::MetaFailed {
message: "history disabled".into(),
})
.await;
return;
};
match journal.recent(limit).await {
Ok(mut entries) => {
entries.reverse();
MetaUpdate::HistoryReady { entries }
}
Err(e) => MetaUpdate::MetaFailed {
message: format!("history read failed: {e}"),
},
}
}
};
let _ = tx.send(update).await;
});
}
async fn resolve_password(
credentials: Option<&dyn DynCredentialStore>,
vault: Option<&VaultRegistry>,
config: &ConnectionConfig,
) -> Option<String> {
match narwhal_config::resolve_connection_password(config, vault, credentials).await {
Ok(Some(secret)) => Some(secret.expose_secret().to_owned()),
Ok(None) => None,
Err(error) => {
tracing::warn!(
target: "narwhal::meta",
connection = %config.name,
%error,
"password resolution failed; connect will proceed without a password",
);
None
}
}
}
async fn dump_schema_all(
pool: &narwhal_pool::Pool,
) -> narwhal_core::error::Result<Vec<TableSchema>> {
let mut conn = pool
.acquire()
.await
.map_err(|e| narwhal_core::Error::Connection(e.to_string()))?;
let schemas = conn.list_all_tables().await?;
let mut out = Vec::new();
for (schema, tables) in &schemas {
for table in tables {
match conn.describe_table(&schema.name, &table.name).await {
Ok(ts) => out.push(ts),
Err(e) => {
tracing::warn!(
target: "narwhal::meta",
schema = %schema.name,
table = %table.name,
error = %e,
"describe_table failed during dump_schema all; skipping"
);
}
}
}
}
Ok(out)
}
async fn refresh_schemas_via_pool(
pool: &narwhal_pool::Pool,
) -> narwhal_core::error::Result<Vec<SchemaListing>> {
let mut conn = pool
.acquire()
.await
.map_err(|e| narwhal_core::Error::Connection(e.to_string()))?;
let mut listing = conn.list_all_tables().await?;
if listing.is_empty() {
if let Ok(tables) = conn.list_tables("").await {
listing.push((
narwhal_core::Schema {
name: String::new(),
},
tables,
));
}
}
Ok(listing)
}