pub mod config;
pub mod edit;
pub mod exec;
pub mod introspect;
pub mod tunnel;
pub use config::{PgAuthMethod, PgConfig, PgTlsMode, SshTunnelRef};
pub use edit::{InsertColumnInput, InsertedRow, UpdateOutcome};
pub use exec::{ActiveCursor, ColumnMeta, ExecutionOutcome, PageResult};
pub use introspect::{
ColumnDetail, DbSummary, ObjectType, ObjectTypeKind, Relation, RelationKind, Routine,
RoutineKind, SchemaContents, SchemaSummary, Sequence,
};
pub use tunnel::SshTunnel;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use rustls::ClientConfig as RustlsClientConfig;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio_postgres::config::SslMode as PgSslMode;
use tokio_postgres::{CancelToken, Client, Config as PgDriverConfig};
use tokio_postgres_rustls::MakeRustlsConnect;
use tokio_util::sync::CancellationToken;
use crate::ssh::SshClient;
const DEFAULT_MAX_POOL_SIZE: usize = 5;
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(300);
const EVICTION_INTERVAL: Duration = Duration::from_secs(30);
const DEFAULT_MIN_IDLE_CONNECTIONS: usize = 1;
pub const BROWSER_SESSION_ID: &str = "_browser";
#[derive(Debug, thiserror::Error)]
pub enum PgError {
#[error("postgres connect failed: {0}")]
Connect(String),
#[error("postgres auth failed: {0}")]
Auth(String),
#[error("postgres tls setup failed: {0}")]
Tls(String),
#[error("ssh tunnel error: {0}")]
Tunnel(String),
#[error("ssh tunnel source not found: {0}")]
TunnelSourceMissing(String),
#[error("cursor no longer available: {0}")]
CursorExpired(String),
#[error("pool exhausted: {0} of {1} connections leased")]
PoolExhausted(usize, usize),
#[error("postgres driver error: {0}")]
Driver(#[from] tokio_postgres::Error),
}
pub struct PgPool {
config: PgConfig,
tunnel: Option<Arc<SshTunnel>>,
tls_connector: TlsConnectorKind,
inner: Mutex<PoolInner>,
max_size: usize,
idle_timeout: Duration,
min_idle: usize,
secondary_browsers: Mutex<HashMap<String, Arc<Mutex<PooledConnection>>>>,
eviction_cancel: CancellationToken,
}
struct IdleEntry {
since: Instant,
conn: Arc<Mutex<PooledConnection>>,
}
#[derive(Clone)]
enum TlsConnectorKind {
NoTls,
Rustls(MakeRustlsConnect),
}
struct PoolInner {
idle: Vec<IdleEntry>,
leased: HashMap<String, Arc<Mutex<PooledConnection>>>,
total: usize,
}
struct PooledConnection {
client: Client,
cancel_token: CancelToken,
active_cursor: Option<ActiveCursor>,
connection_task: Option<JoinHandle<()>>,
}
impl PgPool {
pub async fn connect(
cfg: PgConfig,
ssh_client: Option<Arc<RwLock<SshClient>>>,
) -> Result<Arc<Self>, PgError> {
let tunnel: Option<Arc<SshTunnel>> = if let Some(tunnel_ref) = cfg.ssh_tunnel.as_ref() {
let Some(ssh) = ssh_client else {
return Err(PgError::Tunnel(
"ssh tunnel requested but no ssh client supplied".into(),
));
};
let t = SshTunnel::open(ssh, tunnel_ref.remote_host.clone(), tunnel_ref.remote_port)
.await
.map_err(|e| PgError::Tunnel(format!("failed to open ssh tunnel: {e}")))?;
Some(Arc::new(t))
} else {
None
};
let tls_connector = build_tls_connector(&cfg)?;
let first = open_one(&cfg, tunnel.as_deref(), &tls_connector).await?;
let now = Instant::now();
let max_size = cfg
.max_pool_size
.map(|n| n as usize)
.filter(|&n| n > 0)
.unwrap_or(DEFAULT_MAX_POOL_SIZE);
let idle_timeout = cfg
.idle_timeout_secs
.map(Duration::from_secs)
.unwrap_or(DEFAULT_IDLE_TIMEOUT);
let min_idle = cfg
.min_idle_connections
.map(|n| n as usize)
.unwrap_or(DEFAULT_MIN_IDLE_CONNECTIONS);
let pool = Arc::new(Self {
config: cfg,
tunnel,
tls_connector,
inner: Mutex::new(PoolInner {
idle: vec![IdleEntry {
since: now,
conn: Arc::new(Mutex::new(first)),
}],
leased: HashMap::new(),
total: 1,
}),
max_size,
idle_timeout,
min_idle,
secondary_browsers: Mutex::new(HashMap::new()),
eviction_cancel: CancellationToken::new(),
});
let weak = Arc::downgrade(&pool);
let cancel = pool.eviction_cancel.clone();
tokio::spawn(run_eviction(weak, cancel));
Ok(pool)
}
pub async fn list_databases(&self) -> Result<Vec<DbSummary>, PgError> {
let conn = self.lease_for_session(BROWSER_SESSION_ID).await?;
let guard = conn.lock().await;
Ok(introspect::list_databases(&guard.client).await?)
}
pub async fn list_schemas(&self) -> Result<Vec<SchemaSummary>, PgError> {
self.list_schemas_in(None).await
}
pub async fn list_schemas_in(
&self,
database: Option<&str>,
) -> Result<Vec<SchemaSummary>, PgError> {
let conn = self.browser_connection_for(database).await?;
let guard = conn.lock().await;
Ok(introspect::list_schemas(&guard.client).await?)
}
pub async fn list_relations(&self, schema: &str) -> Result<Vec<Relation>, PgError> {
self.list_relations_in(schema, None).await
}
pub async fn list_relations_in(
&self,
schema: &str,
database: Option<&str>,
) -> Result<Vec<Relation>, PgError> {
let conn = self.browser_connection_for(database).await?;
let guard = conn.lock().await;
Ok(introspect::list_relations(&guard.client, schema).await?)
}
pub async fn list_schema_contents_in(
&self,
schema: &str,
database: Option<&str>,
) -> Result<SchemaContents, PgError> {
let conn = self.browser_connection_for(database).await?;
let guard = conn.lock().await;
Ok(introspect::list_schema_contents(&guard.client, schema).await?)
}
async fn browser_connection_for(
&self,
database: Option<&str>,
) -> Result<Arc<Mutex<PooledConnection>>, PgError> {
let target = database.unwrap_or(self.config.database.as_str());
if target == self.config.database {
return self.lease_for_session(BROWSER_SESSION_ID).await;
}
{
let map = self.secondary_browsers.lock().await;
if let Some(c) = map.get(target) {
return Ok(c.clone());
}
}
let mut cfg = self.config.clone();
cfg.database = target.to_string();
let conn = open_one(&cfg, self.tunnel.as_deref(), &self.tls_connector).await?;
let arc = Arc::new(Mutex::new(conn));
let mut map = self.secondary_browsers.lock().await;
if let Some(existing) = map.get(target) {
return Ok(existing.clone());
}
map.insert(target.to_string(), arc.clone());
Ok(arc)
}
pub async fn describe_columns(
&self,
schema: &str,
table: &str,
) -> Result<Vec<ColumnDetail>, PgError> {
let conn = self.lease_for_session(BROWSER_SESSION_ID).await?;
let guard = conn.lock().await;
Ok(introspect::describe_columns(&guard.client, schema, table).await?)
}
pub async fn execute(
&self,
session_id: &str,
sql: &str,
page_size: usize,
) -> Result<ExecutionOutcome, PgError> {
let conn = self.lease_for_session(session_id).await?;
let mut guard = conn.lock().await;
let previous = guard.active_cursor.take();
let (outcome, new_cursor) =
exec::open_query(&guard.client, sql, page_size, previous).await?;
guard.active_cursor = new_cursor;
Ok(outcome)
}
pub async fn fetch_page(
&self,
session_id: &str,
cursor_id: &str,
count: usize,
) -> Result<PageResult, PgError> {
let conn = self
.leased_only(session_id)
.await
.ok_or_else(|| PgError::CursorExpired(format!("no active session {session_id}")))?;
let guard = conn.lock().await;
let Some(cursor) = guard.active_cursor.as_ref() else {
return Err(PgError::CursorExpired(format!(
"session {session_id} has no active cursor"
)));
};
if cursor.cursor_id != cursor_id {
return Err(PgError::CursorExpired(format!(
"session {session_id} active cursor is {} (looking for {cursor_id})",
cursor.cursor_id
)));
}
let cursor_clone = cursor.clone();
let client = &guard.client;
exec::fetch_page(client, &cursor_clone, count).await
}
#[allow(clippy::too_many_arguments)]
pub async fn update_cell(
&self,
session_id: &str,
schema: &str,
table: &str,
column: &str,
column_type: &str,
new_value: Option<&str>,
ctid: &str,
) -> Result<UpdateOutcome, PgError> {
let conn = self.lease_for_session(session_id).await?;
let guard = conn.lock().await;
edit::update_cell(
&guard.client,
schema,
table,
column,
column_type,
new_value,
ctid,
)
.await
}
pub async fn insert_row(
&self,
session_id: &str,
schema: &str,
table: &str,
inputs: &[InsertColumnInput],
return_columns: &[String],
) -> Result<InsertedRow, PgError> {
let conn = self.lease_for_session(session_id).await?;
let guard = conn.lock().await;
edit::insert_row(&guard.client, schema, table, inputs, return_columns).await
}
pub async fn delete_rows(
&self,
session_id: &str,
schema: &str,
table: &str,
ctids: &[String],
) -> Result<UpdateOutcome, PgError> {
let conn = self.lease_for_session(session_id).await?;
let guard = conn.lock().await;
edit::delete_rows(&guard.client, schema, table, ctids).await
}
pub async fn close_query(&self, session_id: &str, cursor_id: &str) -> Result<(), PgError> {
let Some(conn) = self.leased_only(session_id).await else {
return Ok(()); };
let mut guard = conn.lock().await;
if let Some(c) = guard.active_cursor.as_ref()
&& c.cursor_id == cursor_id
{
let cursor = guard.active_cursor.take().expect("just checked");
let client = &guard.client;
exec::close_query(client, &cursor).await;
}
Ok(())
}
pub async fn cancel(&self, session_id: &str) -> Result<(), PgError> {
let Some(conn) = self.leased_only(session_id).await else {
return Ok(());
};
let token = {
let guard = conn.lock().await;
guard.cancel_token.clone()
};
match &self.tls_connector {
TlsConnectorKind::NoTls => token.cancel_query(tokio_postgres::NoTls).await,
TlsConnectorKind::Rustls(connector) => token.cancel_query(connector.clone()).await,
}
.map_err(PgError::Driver)
}
pub async fn release_session(&self, session_id: &str) {
let Some(conn) = self.take_lease(session_id).await else {
return;
};
{
let mut guard = conn.lock().await;
if let Some(cursor) = guard.active_cursor.take() {
exec::close_query(&guard.client, &cursor).await;
}
}
let mut inner = self.inner.lock().await;
inner.idle.push(IdleEntry {
since: Instant::now(),
conn,
});
}
pub async fn shutdown(&self) {
self.eviction_cancel.cancel();
let mut inner = self.inner.lock().await;
let mut conns: Vec<Arc<Mutex<PooledConnection>>> =
inner.idle.drain(..).map(|e| e.conn).collect();
conns.extend(inner.leased.drain().map(|(_, c)| c));
inner.total = 0;
drop(inner);
let secondaries: Vec<Arc<Mutex<PooledConnection>>> = {
let mut map = self.secondary_browsers.lock().await;
map.drain().map(|(_, c)| c).collect()
};
let conns_with_secondaries = conns.into_iter().chain(secondaries);
let conns: Vec<Arc<Mutex<PooledConnection>>> = conns_with_secondaries.collect();
for conn in conns {
let mut guard = conn.lock().await;
if let Some(task) = guard.connection_task.take() {
task.abort();
}
}
}
async fn lease_for_session(
&self,
session_id: &str,
) -> Result<Arc<Mutex<PooledConnection>>, PgError> {
{
let inner = self.inner.lock().await;
if let Some(c) = inner.leased.get(session_id) {
return Ok(c.clone());
}
}
let from_idle = {
let mut inner = self.inner.lock().await;
inner.idle.pop().map(|e| e.conn)
};
if let Some(conn) = from_idle {
self.assign_lease(session_id, conn.clone()).await;
return Ok(conn);
}
let need_new = {
let inner = self.inner.lock().await;
if inner.total >= self.max_size {
return Err(PgError::PoolExhausted(inner.total, self.max_size));
}
true
};
if need_new {
{
let mut inner = self.inner.lock().await;
if inner.total >= self.max_size {
return Err(PgError::PoolExhausted(inner.total, self.max_size));
}
inner.total += 1;
}
let new_conn =
match open_one(&self.config, self.tunnel.as_deref(), &self.tls_connector).await {
Ok(c) => c,
Err(e) => {
let mut inner = self.inner.lock().await;
inner.total = inner.total.saturating_sub(1);
return Err(e);
}
};
let conn = Arc::new(Mutex::new(new_conn));
self.assign_lease(session_id, conn.clone()).await;
return Ok(conn);
}
unreachable!()
}
async fn assign_lease(&self, session_id: &str, conn: Arc<Mutex<PooledConnection>>) {
let mut inner = self.inner.lock().await;
inner.leased.insert(session_id.to_string(), conn);
}
async fn evict_idle(&self) {
let now = Instant::now();
let to_drop: Vec<Arc<Mutex<PooledConnection>>>;
{
let mut inner = self.inner.lock().await;
let snapshot = std::mem::take(&mut inner.idle);
let mut keep: Vec<IdleEntry> = Vec::with_capacity(snapshot.len());
let mut drop_list: Vec<Arc<Mutex<PooledConnection>>> = Vec::new();
for entry in snapshot.into_iter() {
let aged = now.duration_since(entry.since) >= self.idle_timeout;
if !aged || keep.len() < self.min_idle {
keep.push(entry);
} else {
drop_list.push(entry.conn);
inner.total = inner.total.saturating_sub(1);
}
}
inner.idle = keep;
to_drop = drop_list;
}
if !to_drop.is_empty() {
tracing::debug!(
target: "postgres::pool",
count = to_drop.len(),
"evicted idle postgres connections"
);
}
for conn in to_drop {
let mut guard = conn.lock().await;
if let Some(task) = guard.connection_task.take() {
task.abort();
}
}
}
async fn leased_only(&self, session_id: &str) -> Option<Arc<Mutex<PooledConnection>>> {
let inner = self.inner.lock().await;
inner.leased.get(session_id).cloned()
}
async fn take_lease(&self, session_id: &str) -> Option<Arc<Mutex<PooledConnection>>> {
let mut inner = self.inner.lock().await;
inner.leased.remove(session_id)
}
}
impl Drop for PgPool {
fn drop(&mut self) {
self.eviction_cancel.cancel();
if let Ok(mut inner) = self.inner.try_lock() {
let mut conns: Vec<Arc<Mutex<PooledConnection>>> =
inner.idle.drain(..).map(|e| e.conn).collect();
conns.extend(inner.leased.drain().map(|(_, c)| c));
for conn in conns {
if let Ok(mut guard) = conn.try_lock()
&& let Some(task) = guard.connection_task.take()
{
task.abort();
}
}
}
if let Ok(mut map) = self.secondary_browsers.try_lock() {
for (_, conn) in map.drain() {
if let Ok(mut guard) = conn.try_lock()
&& let Some(task) = guard.connection_task.take()
{
task.abort();
}
}
}
}
}
async fn run_eviction(pool: Weak<PgPool>, cancel: CancellationToken) {
let mut ticker = tokio::time::interval(EVICTION_INTERVAL);
ticker.tick().await;
loop {
tokio::select! {
_ = cancel.cancelled() => return,
_ = ticker.tick() => {
let Some(pool) = pool.upgrade() else { return };
pool.evict_idle().await;
drop(pool);
}
}
}
}
async fn open_one(
cfg: &PgConfig,
tunnel: Option<&SshTunnel>,
tls: &TlsConnectorKind,
) -> Result<PooledConnection, PgError> {
let driver_cfg = build_driver_config(cfg, tunnel)?;
match tls {
TlsConnectorKind::NoTls => {
let (client, connection) = driver_cfg
.connect(tokio_postgres::NoTls)
.await
.map_err(classify_connect_error)?;
Ok(spawn_connection(client, connection))
}
TlsConnectorKind::Rustls(connector) => {
let (client, connection) = driver_cfg
.connect(connector.clone())
.await
.map_err(classify_connect_error)?;
Ok(spawn_connection(client, connection))
}
}
}
fn build_tls_connector(cfg: &PgConfig) -> Result<TlsConnectorKind, PgError> {
match cfg.tls {
PgTlsMode::Disable => Ok(TlsConnectorKind::NoTls),
PgTlsMode::Prefer | PgTlsMode::Require | PgTlsMode::VerifyFull => {
let _ = rustls::crypto::ring::default_provider().install_default();
let tls_config = build_rustls_config(cfg.tls)?;
Ok(TlsConnectorKind::Rustls(MakeRustlsConnect::new(tls_config)))
}
}
}
fn build_driver_config(
cfg: &PgConfig,
tunnel: Option<&SshTunnel>,
) -> Result<PgDriverConfig, PgError> {
let mut driver = PgDriverConfig::new();
if let Some(t) = tunnel {
driver.host("127.0.0.1").port(t.local_port());
} else {
driver.host(&cfg.host).port(cfg.port);
}
driver.dbname(&cfg.database).user(&cfg.user);
let password = match &cfg.auth {
PgAuthMethod::Password { password } => password.clone(),
PgAuthMethod::Keychain { account } => crate::keychain::load_password(
crate::keychain::CredentialKind::PostgresPassword,
account,
)
.map_err(|e| PgError::Auth(format!("keychain load failed for {account}: {e}")))?
.ok_or_else(|| {
PgError::Auth(format!("no keychain entry for postgres account {account}"))
})?,
};
if !password.is_empty() {
driver.password(password);
}
if let Some(name) = &cfg.application_name {
driver.application_name(name);
}
if let Some(secs) = cfg.connect_timeout_secs {
driver.connect_timeout(Duration::from_secs(secs));
}
driver.ssl_mode(match cfg.tls {
PgTlsMode::Disable => PgSslMode::Disable,
PgTlsMode::Prefer => PgSslMode::Prefer,
PgTlsMode::Require | PgTlsMode::VerifyFull => PgSslMode::Require,
});
Ok(driver)
}
fn build_rustls_config(mode: PgTlsMode) -> Result<RustlsClientConfig, PgError> {
let mut roots = rustls::RootCertStore::empty();
let native = rustls_native_certs::load_native_certs();
for cert in native.certs {
let _ = roots.add(cert);
}
let cfg = match mode {
PgTlsMode::VerifyFull => RustlsClientConfig::builder()
.with_root_certificates(roots)
.with_no_client_auth(),
_ => RustlsClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(std::sync::Arc::new(NoCertVerifier))
.with_no_client_auth(),
};
Ok(cfg)
}
fn classify_connect_error(e: tokio_postgres::Error) -> PgError {
if let Some(db_err) = e.as_db_error() {
let code = db_err.code().code();
if code == "28P01" || code == "28000" {
return PgError::Auth(db_err.message().to_string());
}
}
PgError::Connect(e.to_string())
}
fn spawn_connection<S, T>(
client: Client,
connection: tokio_postgres::Connection<S, T>,
) -> PooledConnection
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
T: tokio_postgres::tls::TlsStream + Unpin + Send + 'static,
{
let cancel_token = client.cancel_token();
let task = tokio::spawn(async move {
if let Err(e) = connection.await {
tracing::warn!("postgres connection task ended with error: {e}");
}
});
PooledConnection {
client,
cancel_token,
active_cursor: None,
connection_task: Some(task),
}
}
#[derive(Debug)]
struct NoCertVerifier;
impl rustls::client::danger::ServerCertVerifier for NoCertVerifier {
fn verify_server_cert(
&self,
_end_entity: &rustls::pki_types::CertificateDer<'_>,
_intermediates: &[rustls::pki_types::CertificateDer<'_>],
_server_name: &rustls::pki_types::ServerName<'_>,
_ocsp_response: &[u8],
_now: rustls::pki_types::UnixTime,
) -> Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
Ok(rustls::client::danger::ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &rustls::pki_types::CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
vec![
rustls::SignatureScheme::RSA_PKCS1_SHA256,
rustls::SignatureScheme::RSA_PKCS1_SHA384,
rustls::SignatureScheme::RSA_PKCS1_SHA512,
rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
rustls::SignatureScheme::ED25519,
rustls::SignatureScheme::RSA_PSS_SHA256,
rustls::SignatureScheme::RSA_PSS_SHA384,
rustls::SignatureScheme::RSA_PSS_SHA512,
]
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pool_connect_with_tunnel_requires_ssh_client() {
let cfg = PgConfig {
ssh_tunnel: Some(SshTunnelRef {
ssh_connection_id: "ssh-1".to_string(),
remote_host: "db".to_string(),
remote_port: 5432,
}),
..PgConfig::local("db", "u")
};
let rt = tokio::runtime::Runtime::new().unwrap();
match rt.block_on(PgPool::connect(cfg, None)) {
Err(PgError::Tunnel(detail)) => {
assert!(detail.contains("ssh client"));
}
Err(other) => panic!("expected Tunnel error, got {other:?}"),
Ok(_) => panic!("expected error, got Ok"),
}
}
#[test]
fn driver_config_uses_correct_ssl_mode() {
let mut cfg = PgConfig::local("db", "u");
cfg.tls = PgTlsMode::Require;
let driver = build_driver_config(&cfg, None).expect("driver cfg");
assert!(matches!(driver.get_ssl_mode(), PgSslMode::Require));
cfg.tls = PgTlsMode::Disable;
let driver = build_driver_config(&cfg, None).expect("driver cfg");
assert!(matches!(driver.get_ssl_mode(), PgSslMode::Disable));
}
#[test]
fn driver_config_omits_password_when_empty() {
let cfg = PgConfig::local("db", "u");
let driver = build_driver_config(&cfg, None).expect("driver cfg");
assert!(driver.get_password().is_none());
}
#[test]
fn eviction_policy_keeps_min_idle_and_drops_aged() {
fn run_policy(
entries: Vec<(usize, Instant)>,
now: Instant,
idle_timeout: Duration,
min_idle: usize,
) -> (Vec<usize>, Vec<usize>) {
let mut keep: Vec<usize> = Vec::new();
let mut drop_idx: Vec<usize> = Vec::new();
for (idx, since) in entries {
let aged = now.duration_since(since) >= idle_timeout;
if !aged || keep.len() < min_idle {
keep.push(idx);
} else {
drop_idx.push(idx);
}
}
(keep, drop_idx)
}
let now = Instant::now();
let timeout = Duration::from_secs(300);
let aged = now - timeout - Duration::from_secs(1);
let fresh = now - Duration::from_secs(10);
let (keep, drop_idx) = run_policy(
vec![(0, aged), (1, aged), (2, fresh), (3, aged)],
now,
timeout,
1,
);
assert_eq!(keep, vec![0, 2]);
assert_eq!(drop_idx, vec![1, 3]);
let (keep, drop_idx) = run_policy(vec![(0, aged), (1, fresh), (2, aged)], now, timeout, 0);
assert_eq!(keep, vec![1]);
assert_eq!(drop_idx, vec![0, 2]);
}
}