use std::collections::HashMap;
use chrono::{DateTime, Utc};
use rusqlite::{params, Connection};
use serde::{Deserialize, Serialize};
use crate::error::{EngramError, Result};
use crate::storage::queries::create_memory;
use crate::types::{CreateMemoryInput, MemoryTier, MemoryType};
#[derive(Debug, Clone)]
pub struct ChunkingConfig {
pub max_messages: usize,
pub overlap_messages: usize,
pub max_chars: usize,
pub default_ttl_seconds: i64,
}
impl Default for ChunkingConfig {
fn default() -> Self {
Self {
max_messages: 10,
overlap_messages: 2,
max_chars: 8000,
default_ttl_seconds: 7 * 24 * 60 * 60, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub role: String,
pub content: String,
#[serde(default = "Utc::now")]
pub timestamp: DateTime<Utc>,
pub id: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ConversationChunk {
pub chunk_index: usize,
pub start_index: usize,
pub end_index: usize,
pub messages: Vec<Message>,
pub content: String,
pub char_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Session {
pub session_id: String,
pub title: Option<String>,
pub agent_id: Option<String>,
pub started_at: DateTime<Utc>,
pub last_indexed_at: Option<DateTime<Utc>>,
pub message_count: i64,
pub chunk_count: i64,
pub workspace: String,
pub metadata: HashMap<String, serde_json::Value>,
}
pub fn chunk_conversation(messages: &[Message], config: &ChunkingConfig) -> Vec<ConversationChunk> {
if messages.is_empty() {
return vec![];
}
let mut chunks = Vec::new();
let mut chunk_start = 0;
while chunk_start < messages.len() {
let mut current_messages = Vec::new();
let mut current_chars = 0;
let mut i = chunk_start;
while i < messages.len() {
let msg = &messages[i];
let msg_chars = msg.content.len();
let (content, chars) = if msg_chars > config.max_chars {
let truncated = truncate_with_marker(&msg.content, config.max_chars);
(truncated.clone(), truncated.len())
} else {
(msg.content.clone(), msg_chars)
};
let would_exceed_chars =
current_chars + chars > config.max_chars && !current_messages.is_empty();
let would_exceed_messages = current_messages.len() >= config.max_messages;
if would_exceed_chars || would_exceed_messages {
break;
}
current_messages.push(Message {
role: msg.role.clone(),
content,
timestamp: msg.timestamp,
id: msg.id.clone(),
});
current_chars += chars;
i += 1;
}
if !current_messages.is_empty() {
let chunk_content = format_chunk_content(¤t_messages);
chunks.push(ConversationChunk {
chunk_index: chunks.len(),
start_index: chunk_start,
end_index: i,
messages: current_messages,
content: chunk_content.clone(),
char_count: chunk_content.len(),
});
}
let overlap = config.overlap_messages.min(i - chunk_start);
chunk_start = if i >= messages.len() {
messages.len() } else if i > chunk_start + overlap {
i - overlap
} else {
i };
}
chunks
}
fn truncate_with_marker(content: &str, max_chars: usize) -> String {
if content.len() <= max_chars {
return content.to_string();
}
let marker = "\n[...truncated...]\n";
let available = max_chars - marker.len();
let head_len = (available * 60) / 100;
let tail_len = available - head_len;
let head: String = content.chars().take(head_len).collect();
let tail: String = content
.chars()
.rev()
.take(tail_len)
.collect::<String>()
.chars()
.rev()
.collect();
format!("{}{}{}", head, marker, tail)
}
fn format_chunk_content(messages: &[Message]) -> String {
messages
.iter()
.map(|m| format!("[{}]: {}", m.role, m.content))
.collect::<Vec<_>>()
.join("\n\n")
}
pub fn index_conversation(
conn: &Connection,
session_id: &str,
messages: &[Message],
config: &ChunkingConfig,
workspace: Option<&str>,
title: Option<&str>,
agent_id: Option<&str>,
) -> Result<Session> {
let now = Utc::now();
let workspace = workspace.unwrap_or("default");
let chunks = chunk_conversation(messages, config);
if chunks.is_empty() {
return Err(EngramError::InvalidInput(
"No messages to index".to_string(),
));
}
let overlap_messages: Vec<&Message> = messages
.iter()
.rev()
.take(config.overlap_messages)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect();
let mut session_metadata = HashMap::new();
session_metadata.insert(
"overlap_messages".to_string(),
serde_json::to_value(&overlap_messages).unwrap_or_default(),
);
let metadata_json = serde_json::to_string(&session_metadata)?;
let started_at = messages.first().map(|m| m.timestamp).unwrap_or(now);
conn.execute(
r#"
INSERT INTO sessions (session_id, title, agent_id, started_at, last_indexed_at,
message_count, chunk_count, workspace, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(session_id) DO UPDATE SET
title = COALESCE(excluded.title, sessions.title),
last_indexed_at = excluded.last_indexed_at,
message_count = excluded.message_count,
chunk_count = excluded.chunk_count,
metadata = excluded.metadata
"#,
params![
session_id,
title,
agent_id,
started_at.to_rfc3339(),
now.to_rfc3339(),
messages.len() as i64,
chunks.len() as i64,
workspace,
metadata_json,
],
)?;
conn.execute(
"DELETE FROM session_chunks WHERE session_id = ?",
params![session_id],
)?;
for chunk in &chunks {
let mut metadata = HashMap::new();
metadata.insert("session_id".to_string(), serde_json::json!(session_id));
metadata.insert(
"chunk_index".to_string(),
serde_json::json!(chunk.chunk_index),
);
metadata.insert(
"start_message".to_string(),
serde_json::json!(chunk.start_index),
);
metadata.insert(
"end_message".to_string(),
serde_json::json!(chunk.end_index),
);
metadata.insert(
"message_count".to_string(),
serde_json::json!(chunk.messages.len()),
);
let input = CreateMemoryInput {
content: chunk.content.clone(),
memory_type: MemoryType::TranscriptChunk,
tags: vec!["transcript".to_string(), format!("session:{}", session_id)],
metadata,
importance: Some(0.3), scope: Default::default(),
workspace: Some(workspace.to_string()),
tier: MemoryTier::Daily, defer_embedding: false,
ttl_seconds: Some(config.default_ttl_seconds),
dedup_mode: Default::default(),
dedup_threshold: None,
event_time: None,
event_duration_seconds: None,
trigger_pattern: None,
summary_of_id: None,
media_url: None,
};
let memory = create_memory(conn, &input)?;
conn.execute(
r#"
INSERT INTO session_chunks (session_id, memory_id, chunk_index,
start_message_index, end_message_index)
VALUES (?, ?, ?, ?, ?)
"#,
params![
session_id,
memory.id,
chunk.chunk_index as i64,
chunk.start_index as i64,
chunk.end_index as i64,
],
)?;
}
tracing::info!(
session_id = session_id,
message_count = messages.len(),
chunk_count = chunks.len(),
"Indexed conversation"
);
Ok(Session {
session_id: session_id.to_string(),
title: title.map(String::from),
agent_id: agent_id.map(String::from),
started_at,
last_indexed_at: Some(now),
message_count: messages.len() as i64,
chunk_count: chunks.len() as i64,
workspace: workspace.to_string(),
metadata: HashMap::new(),
})
}
pub fn index_conversation_delta(
conn: &Connection,
session_id: &str,
new_messages: &[Message],
config: &ChunkingConfig,
) -> Result<Session> {
let session: Option<Session> = conn
.query_row(
"SELECT session_id, title, agent_id, started_at, last_indexed_at,
message_count, chunk_count, workspace, metadata
FROM sessions WHERE session_id = ?",
params![session_id],
|row| {
let started_at: String = row.get(3)?;
let last_indexed_at: Option<String> = row.get(4)?;
let metadata_str: String = row.get(8)?;
Ok(Session {
session_id: row.get(0)?,
title: row.get(1)?,
agent_id: row.get(2)?,
started_at: DateTime::parse_from_rfc3339(&started_at)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now()),
last_indexed_at: last_indexed_at.and_then(|s| {
DateTime::parse_from_rfc3339(&s)
.map(|dt| dt.with_timezone(&Utc))
.ok()
}),
message_count: row.get(5)?,
chunk_count: row.get(6)?,
workspace: row.get(7)?,
metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
})
},
)
.ok();
match session {
Some(existing) => {
let last_chunk_end: i64 = conn
.query_row(
"SELECT COALESCE(MAX(end_message_index), 0) FROM session_chunks WHERE session_id = ?",
params![session_id],
|row| row.get(0),
)
.unwrap_or(0);
let overlap_messages: Vec<Message> = existing
.metadata
.get("overlap_messages")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let overlap_count = overlap_messages.len();
let mut all_messages = overlap_messages;
all_messages.extend(new_messages.iter().cloned());
let chunks = chunk_conversation(&all_messages, config);
if chunks.is_empty() {
return Ok(existing);
}
let now = Utc::now();
let new_message_count = existing.message_count + new_messages.len() as i64;
let starting_chunk_index = existing.chunk_count;
let new_overlap: Vec<&Message> = all_messages
.iter()
.rev()
.take(config.overlap_messages)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect();
let mut updated_metadata = existing.metadata.clone();
updated_metadata.insert(
"overlap_messages".to_string(),
serde_json::to_value(&new_overlap).unwrap_or_default(),
);
let metadata_json = serde_json::to_string(&updated_metadata)?;
conn.execute(
"UPDATE sessions SET last_indexed_at = ?, message_count = ?, chunk_count = ?, metadata = ? WHERE session_id = ?",
params![
now.to_rfc3339(),
new_message_count,
existing.chunk_count + chunks.len() as i64,
metadata_json,
session_id,
],
)?;
let base_offset = (last_chunk_end as usize).saturating_sub(overlap_count);
for (i, chunk) in chunks.iter().enumerate() {
let chunk_index = starting_chunk_index as usize + i;
let global_start = chunk.start_index + base_offset;
let global_end = chunk.end_index + base_offset;
let mut metadata = HashMap::new();
metadata.insert("session_id".to_string(), serde_json::json!(session_id));
metadata.insert("chunk_index".to_string(), serde_json::json!(chunk_index));
metadata.insert("start_message".to_string(), serde_json::json!(global_start));
metadata.insert("end_message".to_string(), serde_json::json!(global_end));
metadata.insert(
"message_count".to_string(),
serde_json::json!(chunk.messages.len()),
);
let input = CreateMemoryInput {
content: chunk.content.clone(),
memory_type: MemoryType::TranscriptChunk,
tags: vec!["transcript".to_string(), format!("session:{}", session_id)],
metadata,
importance: Some(0.3),
scope: Default::default(),
workspace: Some(existing.workspace.clone()),
tier: MemoryTier::Daily,
defer_embedding: false,
ttl_seconds: Some(config.default_ttl_seconds),
dedup_mode: Default::default(),
dedup_threshold: None,
event_time: None,
event_duration_seconds: None,
trigger_pattern: None,
summary_of_id: None,
media_url: None,
};
let memory = create_memory(conn, &input)?;
conn.execute(
r#"
INSERT INTO session_chunks (session_id, memory_id, chunk_index,
start_message_index, end_message_index)
VALUES (?, ?, ?, ?, ?)
"#,
params![
session_id,
memory.id,
chunk_index as i64,
global_start as i64,
global_end as i64,
],
)?;
}
Ok(Session {
message_count: new_message_count,
chunk_count: existing.chunk_count + chunks.len() as i64,
last_indexed_at: Some(now),
..existing
})
}
None => {
index_conversation(conn, session_id, new_messages, config, None, None, None)
}
}
}
pub fn get_session(conn: &Connection, session_id: &str) -> Result<Session> {
conn.query_row(
"SELECT session_id, title, agent_id, started_at, last_indexed_at,
message_count, chunk_count, workspace, metadata
FROM sessions WHERE session_id = ?",
params![session_id],
|row| {
let started_at: String = row.get(3)?;
let last_indexed_at: Option<String> = row.get(4)?;
let metadata_str: String = row.get(8)?;
Ok(Session {
session_id: row.get(0)?,
title: row.get(1)?,
agent_id: row.get(2)?,
started_at: DateTime::parse_from_rfc3339(&started_at)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now()),
last_indexed_at: last_indexed_at.and_then(|s| {
DateTime::parse_from_rfc3339(&s)
.map(|dt| dt.with_timezone(&Utc))
.ok()
}),
message_count: row.get(5)?,
chunk_count: row.get(6)?,
workspace: row.get(7)?,
metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
})
},
)
.map_err(|_| EngramError::NotFound(0))
}
pub fn list_sessions(
conn: &Connection,
workspace: Option<&str>,
limit: i64,
) -> Result<Vec<Session>> {
let mut sql = String::from(
"SELECT session_id, title, agent_id, started_at, last_indexed_at,
message_count, chunk_count, workspace, metadata
FROM sessions",
);
let mut params: Vec<Box<dyn rusqlite::ToSql>> = vec![];
if let Some(ws) = workspace {
sql.push_str(" WHERE workspace = ?");
params.push(Box::new(ws.to_string()));
}
sql.push_str(" ORDER BY started_at DESC LIMIT ?");
params.push(Box::new(limit));
let param_refs: Vec<&dyn rusqlite::ToSql> = params.iter().map(|b| b.as_ref()).collect();
let mut stmt = conn.prepare(&sql)?;
let sessions = stmt
.query_map(param_refs.as_slice(), |row| {
let started_at: String = row.get(3)?;
let last_indexed_at: Option<String> = row.get(4)?;
let metadata_str: String = row.get(8)?;
Ok(Session {
session_id: row.get(0)?,
title: row.get(1)?,
agent_id: row.get(2)?,
started_at: DateTime::parse_from_rfc3339(&started_at)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now()),
last_indexed_at: last_indexed_at.and_then(|s| {
DateTime::parse_from_rfc3339(&s)
.map(|dt| dt.with_timezone(&Utc))
.ok()
}),
message_count: row.get(5)?,
chunk_count: row.get(6)?,
workspace: row.get(7)?,
metadata: serde_json::from_str(&metadata_str).unwrap_or_default(),
})
})?
.filter_map(|r| r.ok())
.collect();
Ok(sessions)
}
pub fn delete_session(conn: &Connection, session_id: &str) -> Result<()> {
conn.execute(
r#"
UPDATE memories SET valid_to = datetime('now')
WHERE id IN (SELECT memory_id FROM session_chunks WHERE session_id = ?)
"#,
params![session_id],
)?;
conn.execute(
"DELETE FROM session_chunks WHERE session_id = ?",
params![session_id],
)?;
conn.execute(
"DELETE FROM sessions WHERE session_id = ?",
params![session_id],
)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn make_messages(count: usize, char_len: usize) -> Vec<Message> {
(0..count)
.map(|i| Message {
role: if i % 2 == 0 { "user" } else { "assistant" }.to_string(),
content: format!("Message {} {}", i, "x".repeat(char_len)),
timestamp: Utc::now(),
id: Some(format!("msg-{}", i)),
})
.collect()
}
#[test]
fn test_chunk_by_message_count() {
let config = ChunkingConfig {
max_messages: 3,
overlap_messages: 1,
max_chars: 100000, ..Default::default()
};
let messages = make_messages(7, 10);
let chunks = chunk_conversation(&messages, &config);
assert_eq!(chunks.len(), 3);
assert_eq!(chunks[0].start_index, 0);
assert_eq!(chunks[0].end_index, 3);
assert_eq!(chunks[1].start_index, 2); assert_eq!(chunks[1].end_index, 5);
assert_eq!(chunks[2].start_index, 4); assert_eq!(chunks[2].end_index, 7);
}
#[test]
fn test_chunk_by_char_count() {
let config = ChunkingConfig {
max_messages: 100, overlap_messages: 1,
max_chars: 100, ..Default::default()
};
let messages = make_messages(9, 20);
let chunks = chunk_conversation(&messages, &config);
assert!(chunks.len() > 1);
for chunk in &chunks {
assert!(chunk.char_count <= config.max_chars + 50); }
}
#[test]
fn test_truncate_long_message() {
let config = ChunkingConfig {
max_messages: 10,
overlap_messages: 1,
max_chars: 100,
..Default::default()
};
let long_content = "x".repeat(200);
let messages = vec![Message {
role: "user".to_string(),
content: long_content,
timestamp: Utc::now(),
id: None,
}];
let chunks = chunk_conversation(&messages, &config);
assert_eq!(chunks.len(), 1);
assert!(chunks[0].messages[0].content.contains("[...truncated...]"));
assert!(chunks[0].messages[0].content.len() <= config.max_chars);
}
#[test]
fn test_empty_conversation() {
let config = ChunkingConfig::default();
let messages: Vec<Message> = vec![];
let chunks = chunk_conversation(&messages, &config);
assert!(chunks.is_empty());
}
#[test]
fn test_format_chunk_content() {
let messages = vec![
Message {
role: "user".to_string(),
content: "Hello".to_string(),
timestamp: Utc::now(),
id: None,
},
Message {
role: "assistant".to_string(),
content: "Hi there!".to_string(),
timestamp: Utc::now(),
id: None,
},
];
let content = format_chunk_content(&messages);
assert!(content.contains("[user]: Hello"));
assert!(content.contains("[assistant]: Hi there!"));
}
}