use std::sync::Arc;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{delete, get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use second_brain_core::query::{QueryEngine, QueryFilters, QueryRequest};
use second_brain_core::schema::{Memory, MemoryType, Relation, RelationType};
use second_brain_core::store::Store;
use crate::ingest::run_ingest;
use crate::AppState;
pub fn router() -> Router<Arc<AppState>> {
Router::new()
.route("/health", get(health))
.route("/v1/recall", post(recall))
.route("/v1/remember", post(remember))
.route("/v1/associate", post(associate))
.route("/v1/memory/{id}", get(get_memory))
.route("/v1/memory/{id}", delete(forget))
.route("/v1/context", post(context))
.route("/v1/ingest", post(ingest))
.route("/v1/graph/traverse", get(traverse))
.route("/v1/status", get(status))
.route("/v1/sync/export", post(sync_export))
.route("/v1/sync/import", post(sync_import))
.route("/v1/sync/status", get(sync_status))
.route("/v1/sync/backfill", post(sync_backfill))
.route("/v1/sync/state", post(set_sync_state))
}
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
version: &'static str,
}
async fn health() -> impl IntoResponse {
Json(HealthResponse {
status: "ok",
version: env!("CARGO_PKG_VERSION"),
})
}
#[derive(Deserialize)]
struct RecallRequest {
query: String,
limit: Option<usize>,
source: Option<String>,
memory_type: Option<String>,
}
#[derive(Serialize)]
struct RecallResponse {
memories: Vec<MemoryResponse>,
}
#[derive(Serialize)]
struct MemoryResponse {
id: String,
content: String,
memory_type: String,
confidence: f32,
source: String,
created_at: String,
score: Option<f32>,
}
async fn recall(
State(state): State<Arc<AppState>>,
Json(req): Json<RecallRequest>,
) -> impl IntoResponse {
let limit = req.limit.unwrap_or(10);
let query_text = req.query.clone();
let source_filter = req.source.clone();
let type_filter = req.memory_type.as_deref().and_then(parse_memory_type);
let store = state.store.clone();
let embedder = state.embedder.clone();
let result =
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<(Memory, Option<f32>)>> {
if let Some(ref emb) = embedder {
if let Ok(embedding) = emb.embed(&query_text) {
let engine = QueryEngine::new(store.as_ref());
let request = QueryRequest {
text: query_text.clone(),
embedding,
limit,
filters: QueryFilters {
source: source_filter,
memory_type: type_filter,
min_confidence: None,
entity_names: Vec::new(),
},
};
if let Ok(results) = engine.recall(&request) {
if !results.is_empty() {
for r in &results {
let mut m = r.memory.clone();
m.reinforce();
store.update_memory(&m).ok();
}
return Ok(results
.into_iter()
.map(|r| (r.memory, Some(r.score)))
.collect());
}
}
}
}
let text_results = store.text_search(&query_text, limit).unwrap_or_default();
for m in &text_results {
let mut updated = m.clone();
updated.reinforce();
store.update_memory(&updated).ok();
}
Ok(text_results.into_iter().map(|m| (m, None)).collect())
})
.await;
let memories = match result {
Ok(Ok(m)) => m,
_ => Vec::new(),
};
Json(RecallResponse {
memories: memories
.into_iter()
.map(|(m, score)| to_response(m, score))
.collect(),
})
}
#[derive(Deserialize)]
struct RememberRequest {
content: String,
memory_type: Option<String>,
source: Option<String>,
related_to: Option<Vec<String>>,
}
#[derive(Serialize)]
struct RememberResponse {
id: String,
memory_type: String,
}
async fn remember(
State(state): State<Arc<AppState>>,
Json(req): Json<RememberRequest>,
) -> impl IntoResponse {
let memory_type = req
.memory_type
.as_deref()
.and_then(parse_memory_type)
.unwrap_or(MemoryType::Semantic);
let source = req.source.unwrap_or_else(|| "api".to_string());
let content = req.content.clone();
let related_to = req.related_to.clone();
let store = state.store.clone();
let embedder = state.embedder.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<(Uuid, MemoryType)> {
let mut memory = Memory::new(content.clone(), memory_type, source, String::new());
if let Some(ref emb) = embedder {
if let Ok(embedding) = emb.embed(&content) {
memory.embedding = embedding;
}
}
let memory_id = memory.id;
retry_write(|| store.store_memory(&memory))?;
if let Some(related) = related_to {
for entity_name in &related {
if let Ok(Some(entity)) = store.find_entity_by_name(entity_name) {
let relation = Relation {
from_id: memory_id,
to_id: entity.id,
relation_type: RelationType::Mentions,
strength: 1.0,
context: None,
};
store.store_relation(&relation).ok();
}
}
}
Ok((memory_id, memory_type))
})
.await;
match result {
Ok(Ok((id, mt))) => Json(RememberResponse {
id: id.to_string(),
memory_type: format!("{:?}", mt).to_lowercase(),
})
.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct AssociateRequest {
from_id: String,
to_id: String,
relation: String,
context: Option<String>,
}
async fn associate(
State(state): State<Arc<AppState>>,
Json(req): Json<AssociateRequest>,
) -> impl IntoResponse {
let from_id = match Uuid::parse_str(&req.from_id) {
Ok(id) => id,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid from_id").into_response(),
};
let to_id = match Uuid::parse_str(&req.to_id) {
Ok(id) => id,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid to_id").into_response(),
};
let relation_type = match req.relation.as_str() {
"contradicts" => RelationType::Contradicts,
"reinforces" => RelationType::Reinforces,
"supersedes" => RelationType::Supersedes,
_ => RelationType::RelatesTo,
};
let ctx = req.context.clone();
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || {
let relation = Relation {
from_id,
to_id,
relation_type,
strength: 1.0,
context: ctx,
};
retry_write(|| store.store_relation(&relation))
})
.await;
match result {
Ok(Ok(_)) => StatusCode::CREATED.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
async fn get_memory(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid uuid").into_response(),
};
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.get_memory(uuid)).await;
match result {
Ok(Ok(Some(m))) => Json(to_response(m, None)).into_response(),
Ok(Ok(None)) => StatusCode::NOT_FOUND.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
async fn forget(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid uuid").into_response(),
};
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.delete_memory(uuid)).await;
match result {
Ok(Ok(_)) => StatusCode::NO_CONTENT.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct ContextRequest {
project: String,
depth: Option<u32>,
}
#[derive(Serialize)]
struct ContextResponse {
memories: Vec<ContextMemory>,
}
#[derive(Serialize)]
struct ContextMemory {
id: String,
content: String,
memory_type: String,
confidence: f32,
connected: Vec<String>,
}
async fn context(
State(state): State<Arc<AppState>>,
Json(req): Json<ContextRequest>,
) -> impl IntoResponse {
let project = req.project.clone();
let depth = req.depth.unwrap_or(2);
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<ContextMemory>> {
let memories = store.memories_by_source("claude-code")?;
let project_lower = project.to_lowercase();
let relevant: Vec<&Memory> = memories
.iter()
.filter(|m| {
m.content.to_lowercase().contains(&project_lower)
|| m.source_id.to_lowercase().contains(&project_lower)
})
.take(10)
.collect();
let mut out = Vec::new();
for memory in relevant {
let connected: Vec<String> = store
.traverse(memory.id, depth)
.unwrap_or_default()
.into_iter()
.map(|(m, _)| truncate(&m.content, 100))
.collect();
out.push(ContextMemory {
id: memory.id.to_string(),
content: memory.content.clone(),
memory_type: format!("{:?}", memory.memory_type).to_lowercase(),
confidence: memory.confidence,
connected,
});
}
Ok(out)
})
.await;
match result {
Ok(Ok(memories)) => Json(ContextResponse { memories }).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct IngestRequest {
#[allow(dead_code)]
source: Option<String>,
}
#[derive(Serialize)]
struct IngestResponse {
new_memories: usize,
new_entities: usize,
new_conversations: usize,
skipped_dupes: usize,
unchanged: usize,
errors: usize,
files_processed: usize,
}
async fn ingest(
State(state): State<Arc<AppState>>,
Json(_req): Json<IngestRequest>,
) -> impl IntoResponse {
let store = state.store.clone();
let embedder = state.embedder.clone();
let result = tokio::task::spawn_blocking(move || run_ingest(&store, embedder.as_deref())).await;
match result {
Ok(Ok(stats)) => Json(IngestResponse {
new_memories: stats.new_memories,
new_entities: stats.new_entities,
new_conversations: stats.new_conversations,
skipped_dupes: stats.skipped_dupes,
unchanged: stats.unchanged,
errors: stats.errors,
files_processed: stats.files_processed,
})
.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct TraverseParams {
start_id: String,
depth: Option<u32>,
}
async fn traverse(
State(state): State<Arc<AppState>>,
Query(params): Query<TraverseParams>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(¶ms.start_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid start_id").into_response(),
};
let depth = params.depth.unwrap_or(2);
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.traverse(uuid, depth)).await;
match result {
Ok(Ok(results)) => {
let memories: Vec<MemoryResponse> = results
.into_iter()
.map(|(m, _)| to_response(m, None))
.collect();
Json(serde_json::json!({"memories": memories})).into_response()
}
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Serialize)]
struct StatusResponse {
memory_count: usize,
ingested_file_count: usize,
version: &'static str,
embedder_available: bool,
}
async fn status(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let store = state.store.clone();
let embedder_available = state.embedder.is_some();
let result = tokio::task::spawn_blocking(move || -> (usize, usize) {
let count = store.memory_count().unwrap_or(0);
let files = store.ingested_file_count().unwrap_or(0);
(count, files)
})
.await;
let (memory_count, ingested_file_count) = result.unwrap_or((0, 0));
Json(StatusResponse {
memory_count,
ingested_file_count,
version: env!("CARGO_PKG_VERSION"),
embedder_available,
})
}
#[derive(Deserialize)]
struct SyncExportRequest {
after_seq: Option<u64>,
limit: Option<usize>,
}
#[derive(Serialize)]
struct SyncExportResponse {
records: Vec<serde_json::Value>,
max_seq: u64,
}
async fn sync_export(
State(state): State<Arc<AppState>>,
Json(req): Json<SyncExportRequest>,
) -> impl IntoResponse {
let after_seq = req.after_seq.unwrap_or(0);
let limit = req.limit;
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<SyncExportResponse> {
if limit.is_some() {
let capped = store.sync_log_page(after_seq, limit)?;
let max_seq = capped.last().map(|e| e.seq).unwrap_or(after_seq);
let records: Vec<serde_json::Value> = capped
.iter()
.map(|e| {
let data: Option<serde_json::Value> =
e.data.as_ref().and_then(|d| serde_json::from_str(d).ok());
serde_json::json!({
"seq": e.seq,
"op": e.op,
"node_type": e.node_type,
"node_id": e.node_id,
"machine_id": e.machine_id,
"timestamp": e.timestamp,
"data": data,
})
})
.collect();
Ok(SyncExportResponse { records, max_seq })
} else {
let mut buf = Vec::new();
let max_seq =
second_brain_sync::export::export_changes(store.as_ref(), after_seq, &mut buf)?;
let jsonl = String::from_utf8(buf)?;
let records: Vec<serde_json::Value> = jsonl
.lines()
.filter(|l| !l.trim().is_empty())
.filter_map(|l| serde_json::from_str(l).ok())
.collect();
Ok(SyncExportResponse { records, max_seq })
}
})
.await;
match result {
Ok(Ok(resp)) => Json(resp).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct SyncImportRequest {
records: Vec<serde_json::Value>,
}
#[derive(Serialize)]
struct SyncImportResponse {
created: u64,
updated: u64,
deleted: u64,
skipped: u64,
errors: u64,
max_seq: u64,
}
async fn sync_import(
State(state): State<Arc<AppState>>,
Json(req): Json<SyncImportRequest>,
) -> impl IntoResponse {
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<SyncImportResponse> {
let mut jsonl = String::new();
for record in &req.records {
jsonl.push_str(&serde_json::to_string(record)?);
jsonl.push('\n');
}
let reader = std::io::BufReader::new(jsonl.as_bytes());
let (stats, max_seq) = second_brain_sync::import::import_changes(store.as_ref(), reader)?;
Ok(SyncImportResponse {
created: stats.created,
updated: stats.updated,
deleted: stats.deleted,
skipped: stats.skipped,
errors: stats.errors,
max_seq,
})
})
.await;
match result {
Ok(Ok(resp)) => Json(resp).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Serialize)]
struct SyncPeerStatus {
peer_id: String,
last_seq: u64,
last_sync_at: String,
}
async fn sync_status(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.get_all_sync_states()).await;
match result {
Ok(Ok(states)) => {
let peers: Vec<SyncPeerStatus> = states
.into_iter()
.map(|s| SyncPeerStatus {
peer_id: s.peer_id,
last_seq: s.last_seq,
last_sync_at: s.last_sync_at.to_rfc3339(),
})
.collect();
Json(serde_json::json!({"peers": peers})).into_response()
}
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
async fn sync_backfill(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.backfill_sync_log()).await;
match result {
Ok(Ok(count)) => Json(serde_json::json!({"backfilled": count})).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct SetSyncStateRequest {
peer_id: String,
last_seq: u64,
}
async fn set_sync_state(
State(state): State<Arc<AppState>>,
Json(req): Json<SetSyncStateRequest>,
) -> impl IntoResponse {
let store = state.store.clone();
let peer_id = req.peer_id;
let last_seq = req.last_seq;
let result = tokio::task::spawn_blocking(move || {
use second_brain_core::schema::SyncState;
store.set_sync_state(&SyncState {
peer_id,
last_seq,
last_sync_at: chrono::Utc::now(),
})
})
.await;
match result {
Ok(Ok(_)) => StatusCode::OK.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
fn to_response(m: Memory, score: Option<f32>) -> MemoryResponse {
MemoryResponse {
id: m.id.to_string(),
content: m.content,
memory_type: format!("{:?}", m.memory_type).to_lowercase(),
confidence: m.confidence,
source: m.source,
created_at: m.created_at.to_rfc3339(),
score,
}
}
fn parse_memory_type(s: &str) -> Option<MemoryType> {
match s {
"episodic" => Some(MemoryType::Episodic),
"semantic" => Some(MemoryType::Semantic),
"procedural" => Some(MemoryType::Procedural),
_ => None,
}
}
fn truncate(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
s.to_string()
} else {
let mut end = max_len.min(s.len());
while end > 0 && !s.is_char_boundary(end) {
end -= 1;
}
format!("{}...", &s[..end])
}
}
fn retry_write<F, T>(mut op: F) -> anyhow::Result<T>
where
F: FnMut() -> anyhow::Result<T>,
{
let mut last_err = None;
for attempt in 0..5u32 {
match op() {
Ok(v) => return Ok(v),
Err(e) => {
last_err = Some(e);
std::thread::sleep(std::time::Duration::from_millis(50 * 2u64.pow(attempt)));
}
}
}
Err(last_err.unwrap())
}