use std::collections::HashMap;
use std::sync::Arc;
use futures::TryStreamExt;
use futures::future::BoxFuture;
use object_store::{ObjectStore, path::Path};
use tracing::debug;
use crate::{
error::{ExternalError, Result},
watcher::{ExternalEvent, ExternalSource},
};
#[derive(Debug, Clone)]
pub struct StoredObject {
pub path: String,
pub etag: Option<String>,
pub size: usize,
}
pub struct ObjectStorePoller {
store: Arc<dyn ObjectStore>,
prefix: Option<Path>,
name: String,
known: HashMap<String, String>,
}
impl ObjectStorePoller {
pub fn new(store: Arc<dyn ObjectStore>) -> Self {
Self {
name: store.to_string(),
store,
prefix: None,
known: HashMap::new(),
}
}
pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
let prefix = prefix.into();
self.name = format!("{}/{}", self.name.trim_end_matches('/'), prefix);
self.prefix = Some(Path::from(prefix.as_str()));
self
}
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = name.into();
self
}
}
impl ExternalSource for ObjectStorePoller {
type Item = StoredObject;
fn name(&self) -> &str {
&self.name
}
fn poll(&mut self) -> BoxFuture<'_, Result<Vec<ExternalEvent<StoredObject>>>> {
let store = Arc::clone(&self.store);
let prefix = self.prefix.clone();
let this = self;
Box::pin(async move {
let stream = store.list(prefix.as_ref());
let listing: Vec<_> = stream
.try_collect()
.await
.map_err(|e| ExternalError::Internal(format!("object store list failed: {e}")))?;
let mut current: HashMap<String, StoredObject> = HashMap::new();
for meta in listing {
let path = meta.location.to_string();
current.insert(
path.clone(),
StoredObject {
path,
etag: meta.e_tag,
size: meta.size,
},
);
}
let mut events: Vec<ExternalEvent<StoredObject>> = Vec::new();
for (path, obj) in ¤t {
match this.known.get(path) {
None => events.push(ExternalEvent::Added(obj.clone())),
Some(known_etag) => {
if obj.etag.as_deref() != Some(known_etag.as_str()) {
events.push(ExternalEvent::Modified(obj.clone()));
}
}
}
}
for path in this.known.keys() {
if !current.contains_key(path) {
events.push(ExternalEvent::Removed(StoredObject {
path: path.clone(),
etag: None,
size: 0,
}));
}
}
this.known = current
.into_iter()
.filter_map(|(k, v)| v.etag.map(|e| (k, e)))
.collect();
debug!(
source = %this.name,
added = events.iter().filter(|e| matches!(e, ExternalEvent::Added(_))).count(),
modified = events.iter().filter(|e| matches!(e, ExternalEvent::Modified(_))).count(),
removed = events.iter().filter(|e| matches!(e, ExternalEvent::Removed(_))).count(),
"Object store poll completed"
);
Ok(events)
})
}
}