use anyhow::Result;
use async_trait::async_trait;
use drasi_core::models::{Element, SourceChange};
use log::info;
use std::sync::Arc;
use tokio::sync::RwLock;
use drasi_lib::bootstrap::BootstrapRequest;
use drasi_lib::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapResult};
pub struct ApplicationBootstrapProvider {
bootstrap_data: Arc<RwLock<Vec<SourceChange>>>,
}
impl ApplicationBootstrapProvider {
pub fn new() -> Self {
Self {
bootstrap_data: Arc::new(RwLock::new(Vec::new())),
}
}
pub fn with_shared_data(bootstrap_data: Arc<RwLock<Vec<SourceChange>>>) -> Self {
Self { bootstrap_data }
}
pub fn builder() -> ApplicationBootstrapProviderBuilder {
ApplicationBootstrapProviderBuilder::new()
}
}
impl Default for ApplicationBootstrapProvider {
fn default() -> Self {
Self::new()
}
}
pub struct ApplicationBootstrapProviderBuilder {
shared_data: Option<Arc<RwLock<Vec<SourceChange>>>>,
}
impl ApplicationBootstrapProviderBuilder {
pub fn new() -> Self {
Self { shared_data: None }
}
pub fn with_shared_data(mut self, data: Arc<RwLock<Vec<SourceChange>>>) -> Self {
self.shared_data = Some(data);
self
}
pub fn build(self) -> ApplicationBootstrapProvider {
match self.shared_data {
Some(data) => ApplicationBootstrapProvider::with_shared_data(data),
None => ApplicationBootstrapProvider::new(),
}
}
}
impl Default for ApplicationBootstrapProviderBuilder {
fn default() -> Self {
Self::new()
}
}
impl ApplicationBootstrapProvider {
pub async fn store_insert_event(&self, change: SourceChange) {
if matches!(change, SourceChange::Insert { .. }) {
self.bootstrap_data.write().await.push(change);
}
}
pub async fn get_stored_events(&self) -> Vec<SourceChange> {
self.bootstrap_data.read().await.clone()
}
pub async fn clear_stored_events(&self) {
self.bootstrap_data.write().await.clear();
}
fn matches_labels(&self, change: &SourceChange, request: &BootstrapRequest) -> bool {
match change {
SourceChange::Insert { element } | SourceChange::Update { element, .. } => {
match element {
Element::Node { metadata, .. } => {
request.node_labels.is_empty()
|| metadata
.labels
.iter()
.any(|l| request.node_labels.contains(&l.to_string()))
}
Element::Relation { metadata, .. } => {
request.relation_labels.is_empty()
|| metadata
.labels
.iter()
.any(|l| request.relation_labels.contains(&l.to_string()))
}
}
}
SourceChange::Delete { metadata } => {
request.node_labels.is_empty() && request.relation_labels.is_empty()
|| metadata.labels.iter().any(|l| {
request.node_labels.contains(&l.to_string())
|| request.relation_labels.contains(&l.to_string())
})
}
SourceChange::Future { .. } => {
false
}
}
}
}
#[async_trait]
impl BootstrapProvider for ApplicationBootstrapProvider {
async fn bootstrap(
&self,
request: BootstrapRequest,
_context: &BootstrapContext,
_event_tx: drasi_lib::channels::BootstrapEventSender,
_settings: Option<&drasi_lib::config::SourceSubscriptionSettings>,
) -> Result<BootstrapResult> {
info!(
"ApplicationBootstrapProvider processing bootstrap request for query '{}' with {} node labels and {} relation labels",
request.query_id,
request.node_labels.len(),
request.relation_labels.len()
);
let bootstrap_data = self.bootstrap_data.read().await;
let mut count = 0;
if bootstrap_data.is_empty() {
info!(
"ApplicationBootstrapProvider: No stored events to replay for query '{}'",
request.query_id
);
return Ok(BootstrapResult::default());
}
info!(
"ApplicationBootstrapProvider: Replaying {} stored events for query '{}'",
bootstrap_data.len(),
request.query_id
);
for change in bootstrap_data.iter() {
if self.matches_labels(change, &request) {
count += 1;
}
}
info!(
"ApplicationBootstrapProvider sent {} bootstrap events for query '{}'",
count, request.query_id
);
Ok(BootstrapResult {
event_count: count,
last_sequence: None,
sequences_aligned: false,
})
}
}