use std::future::{Future, IntoFuture};
use std::pin::Pin;
use chrono::{DateTime, FixedOffset};
use crate::jobs::MemoryJobsStore;
use crate::memory::{Memory, RetirementReason};
use crate::store::{EditPatch, IndexStatus, MemoryStore};
use super::{Client, ClientError};
#[must_use = "edit(..) returns a builder that must be awaited"]
pub struct EditBuilder<'a> {
client: &'a Client,
pid: String,
content: Option<String>,
metadata: Option<serde_json::Value>,
event_at: Option<Option<DateTime<FixedOffset>>>,
}
impl<'a> EditBuilder<'a> {
pub(super) fn new(client: &'a Client, pid: String) -> Self {
Self {
client,
pid,
content: None,
metadata: None,
event_at: None,
}
}
pub fn content(mut self, content: impl Into<String>) -> Self {
self.content = Some(content.into());
self
}
pub fn metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = Some(metadata);
self
}
pub fn event_at(mut self, event_at: impl Into<DateTime<FixedOffset>>) -> Self {
self.event_at = Some(Some(event_at.into()));
self
}
}
impl<'a> IntoFuture for EditBuilder<'a> {
type Output = Result<Memory, ClientError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(execute(self))
}
}
async fn execute(builder: EditBuilder<'_>) -> Result<Memory, ClientError> {
let EditBuilder {
client,
pid,
content,
metadata,
event_at,
} = builder;
if let Some(obj) = metadata.as_ref().and_then(|m| m.as_object()) {
for key in obj.keys() {
if crate::vector::qdrant::RESERVED_PAYLOAD_KEYS
.iter()
.any(|reserved| reserved == key)
{
return Err(ClientError::ReservedMetadataKey { key: key.clone() });
}
}
}
let content_changed = content.is_some();
let event_at_changed = event_at.is_some();
let patch = EditPatch {
content,
metadata,
event_at,
};
let inner = client.inner.clone();
let updated = inner.store.edit(&pid, patch).await?;
if content_changed {
inner.store.set_index_status(&pid, IndexStatus::Pending).await?;
inner
.jobs
.enqueue(
crate::jobs::JobKind::Embed,
pid.clone(),
serde_json::json!({ "origin": "edit" }),
)
.await?;
}
if content_changed || event_at_changed {
inner
.jobs
.enqueue(
crate::jobs::JobKind::Reprocess,
pid.clone(),
serde_json::json!({ "reason": RetirementReason::Stale.as_ref() }),
)
.await?;
}
Ok(updated)
}