#[derive(Debug)]
pub struct Async<'c> {
last_result: Option<crate::Result<crate::pq::Result>>,
connection: &'c std::sync::Mutex<libpq::Connection>,
}
impl std::future::Future for Async<'_> {
type Output = crate::Result<crate::pq::Result>;
fn poll(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let connection = match self.connection.lock() {
Ok(connection) => connection,
Err(_) => return std::task::Poll::Pending,
};
if let Some(result) = connection.result() {
self.last_result = Some(result.try_into());
} else {
let last_result = self.last_result.take();
if let Some(result) = last_result {
return std::task::Poll::Ready(result);
}
}
ctx.waker().wake_by_ref();
std::task::Poll::Pending
}
}
impl<'c> Async<'c> {
pub(crate) fn new(connection: &'c std::sync::Mutex<libpq::Connection>) -> Self {
Self {
last_result: None,
connection,
}
}
pub async fn execute(self, query: &str) -> crate::Result<crate::pq::Result> {
self.connection
.lock()
.map_err(|e| crate::Error::Mutex(e.to_string()))?
.send_query(query)
.map_err(crate::Error::Async)?;
self.await
}
pub async fn query<E: crate::Entity>(
self,
query: &str,
params: &[&dyn crate::ToSql],
) -> crate::Result<crate::Rows<E>> {
Ok(self.send_query(query, params).await?.into())
}
pub async fn query_one<E: crate::Entity>(
self,
query: &str,
params: &[&dyn crate::ToSql],
) -> crate::Result<E> {
match self.query(query, params).await?.try_get(0) {
Some(e) => Ok(e),
None => Err(crate::Error::MissingField("0".to_string())),
}
}
async fn send_query(
self,
query: &str,
params: &[&dyn crate::ToSql],
) -> crate::Result<crate::pq::Result> {
let param = crate::Connection::transform_params(params)?;
self.connection
.lock()
.map_err(|e| crate::Error::Mutex(e.to_string()))?
.send_query_params(
query,
¶m.types,
¶m
.values
.iter()
.map(|x| x.as_deref())
.collect::<Vec<_>>(),
&[],
crate::pq::Format::Binary,
)
.map_err(crate::Error::Async)?;
self.await
}
}