use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use super::{
super::{percent_encode, Client},
parse_gs_url, StorageObject,
};
use crate::common::*;
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ListQuery<'a> {
prefix: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
page_token: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ListResponse {
#[allow(dead_code)]
kind: String,
next_page_token: Option<String>,
#[serde(default)]
items: Vec<StorageObject>,
}
macro_rules! try_and_forward_errors {
($ctx:expr, $expression:expr, $sender:expr) => {
match $expression {
Ok(val) => val,
Err(err) => {
error!("error in gcloud worker: {}", err);
$sender.send(Err(err.into())).await.map_send_err()?;
return Ok(());
}
}
};
($ctx:expr, $expression:expr, $sender:expr,) => {
try_and_forward_errors!($ctx, $expression, $sender)
};
}
#[instrument(level = "trace", skip(ctx))]
pub(crate) async fn ls(
ctx: &Context,
url: &Url,
) -> Result<impl Stream<Item = Result<StorageObject>> + Send + Unpin + 'static> {
debug!("listing {}", url);
let (bucket, object) = parse_gs_url(url)?;
let dir_prefix = if object.ends_with('/') {
object.clone()
} else {
format!("{}/", object)
};
let (sender, receiver) = mpsc::channel::<Result<StorageObject>>(1);
let worker: BoxFuture<()> = async move {
let client = try_and_forward_errors!(worker_ctx, Client::new().await, sender,);
let mut seen = HashSet::new();
let req_url = format!(
"https://storage.googleapis.com/storage/v1/b/{}/o",
percent_encode(&bucket),
);
let mut page_token = None;
loop {
let query = ListQuery {
prefix: &object,
page_token: page_token.clone(),
};
let get_result = client.get::<ListResponse, _, _>(&req_url, query).await;
let mut res = try_and_forward_errors!(worker_ctx, get_result, sender);
let next_page_token = res.next_page_token.take();
if page_token.is_some() && page_token == next_page_token {
return Err(format_err!(
"tried to list page {:?} of files twice",
page_token
));
}
page_token = next_page_token;
for item in res.items {
if !seen.insert(item.name.clone()) {
continue;
}
if !item.name.to_ascii_lowercase().ends_with(".csv") {
continue;
}
if item.name != object && !item.name.starts_with(&dir_prefix) {
trace!("filtered false match {:?}", item.name);
continue;
}
sender.send(Ok(item)).await.map_err(|_| {
format_err!(
"error sending data to stream (perhaps it was closed)",
)
})?;
}
if page_token.is_none() {
break;
}
}
Ok(())
}
.in_current_span()
.boxed();
ctx.spawn_worker(worker);
Ok(ReceiverStream::new(receiver))
}