use std::sync::Arc;
use chrono::{DateTime, Utc};
use entelix_core::ir::Message;
use entelix_core::{ExecutionContext, Result};
use parking_lot::Mutex;
use crate::consolidation::{ConsolidationContext, ConsolidationPolicy};
use crate::namespace::Namespace;
use crate::store::Store;
const DEFAULT_KEY: &str = "buffer";
pub struct BufferMemory {
store: Arc<dyn Store<Vec<Message>>>,
namespace: Namespace,
max_turns: usize,
consolidation: Option<Arc<dyn ConsolidationPolicy>>,
last_consolidated_at: Mutex<Option<DateTime<Utc>>>,
}
impl BufferMemory {
pub fn new(
store: Arc<dyn Store<Vec<Message>>>,
namespace: Namespace,
max_turns: usize,
) -> Self {
Self {
store,
namespace,
max_turns,
consolidation: None,
last_consolidated_at: Mutex::new(None),
}
}
#[must_use]
pub fn with_consolidation_policy(mut self, policy: Arc<dyn ConsolidationPolicy>) -> Self {
self.consolidation = Some(policy);
self
}
pub const fn max_turns(&self) -> usize {
self.max_turns
}
pub const fn namespace(&self) -> &Namespace {
&self.namespace
}
pub fn last_consolidated_at(&self) -> Option<DateTime<Utc>> {
*self.last_consolidated_at.lock()
}
pub fn mark_consolidated(&self, at: DateTime<Utc>) {
*self.last_consolidated_at.lock() = Some(at);
}
pub fn mark_consolidated_now(&self) {
self.mark_consolidated(Utc::now());
}
pub async fn append(&self, ctx: &ExecutionContext, message: Message) -> Result<()> {
let mut messages = self
.store
.get(ctx, &self.namespace, DEFAULT_KEY)
.await?
.unwrap_or_default();
messages.push(message);
while messages.len() > self.max_turns {
messages.remove(0);
}
self.store
.put(ctx, &self.namespace, DEFAULT_KEY, messages)
.await
}
pub async fn messages(&self, ctx: &ExecutionContext) -> Result<Vec<Message>> {
Ok(self
.store
.get(ctx, &self.namespace, DEFAULT_KEY)
.await?
.unwrap_or_default())
}
pub async fn clear(&self, ctx: &ExecutionContext) -> Result<()> {
self.store.delete(ctx, &self.namespace, DEFAULT_KEY).await
}
pub async fn should_consolidate(&self, ctx: &ExecutionContext) -> Result<bool> {
self.should_consolidate_with(ctx, PolicyExtras::default())
.await
}
pub async fn should_consolidate_with(
&self,
ctx: &ExecutionContext,
extras: PolicyExtras,
) -> Result<bool> {
let Some(policy) = self.consolidation.as_ref() else {
return Ok(false);
};
let buffer = self.messages(ctx).await?;
let mut consolidation_ctx = ConsolidationContext::new(&buffer);
let effective_last = extras
.last_consolidated_at
.or_else(|| *self.last_consolidated_at.lock());
if let Some(at) = effective_last {
consolidation_ctx = consolidation_ctx.with_last_consolidated_at(at);
}
if let (Some(used), Some(available)) =
(extras.context_tokens_used, extras.context_tokens_available)
{
consolidation_ctx = consolidation_ctx.with_context_tokens(used, available);
}
Ok(policy.should_consolidate(&consolidation_ctx))
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct PolicyExtras {
pub last_consolidated_at: Option<DateTime<Utc>>,
pub context_tokens_used: Option<usize>,
pub context_tokens_available: Option<usize>,
}