use std::sync::Arc;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::{delete, get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use second_brain_core::query::{QueryEngine, QueryFilters, QueryRequest};
use second_brain_core::schema::{Memory, MemoryType, Relation, RelationType};
use second_brain_core::store::Store;
use crate::AppState;
use crate::ingest::run_ingest;
pub fn router() -> Router<Arc<AppState>> {
Router::new()
.route("/health", get(health))
.route("/v1/recall", post(recall))
.route("/v1/remember", post(remember))
.route("/v1/associate", post(associate))
.route("/v1/memory/{id}", get(get_memory))
.route("/v1/memory/{id}", delete(forget))
.route("/v1/context", post(context))
.route("/v1/ingest", post(ingest))
.route("/v1/reingest", post(reingest))
.route("/v1/consolidate/batch", get(consolidate_batch))
.route("/v1/consolidate/store", post(consolidate_store))
.route("/v1/consolidate/cleanup", post(consolidate_cleanup))
.route("/v1/daily/memories", post(daily_memories))
.route("/v1/graph/traverse", get(traverse))
.route("/v1/status", get(status))
.route("/v1/sync/export", post(sync_export))
.route("/v1/sync/import", post(sync_import))
.route("/v1/sync/status", get(sync_status))
.route("/v1/sync/backfill", post(sync_backfill))
.route("/v1/sync/state", post(set_sync_state))
.route("/v1/backfill-project-paths", post(backfill_project_paths))
}
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
version: &'static str,
}
async fn health() -> impl IntoResponse {
Json(HealthResponse {
status: "ok",
version: env!("CARGO_PKG_VERSION"),
})
}
#[derive(Deserialize)]
struct RecallRequest {
query: String,
limit: Option<usize>,
source: Option<String>,
memory_type: Option<String>,
project: Option<String>,
}
#[derive(Serialize)]
struct RecallResponse {
memories: Vec<MemoryResponse>,
}
#[derive(Serialize)]
struct MemoryResponse {
id: String,
content: String,
memory_type: String,
confidence: f32,
source: String,
created_at: String,
score: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
project_path: Option<String>,
machine_id: String,
machine_name: String,
}
async fn recall(
State(state): State<Arc<AppState>>,
Json(req): Json<RecallRequest>,
) -> impl IntoResponse {
let limit = req.limit.unwrap_or(10);
let query_text = req.query.clone();
let source_filter = req.source.clone();
let type_filter = req.memory_type.as_deref().and_then(parse_memory_type);
let project_filter = req.project.clone();
let store = state.store.clone();
let embedder = state.embedder.clone();
let result =
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<(Memory, Option<f32>)>> {
if let Some(ref emb) = embedder
&& let Ok(embedding) = emb.embed_query(&query_text)
{
let engine = QueryEngine::new(store.as_ref());
let request = QueryRequest {
text: query_text.clone(),
embedding,
limit,
filters: QueryFilters {
source: source_filter,
memory_type: type_filter,
min_confidence: None,
entity_names: Vec::new(),
project_path: project_filter,
},
};
if let Ok(results) = engine.recall(&request)
&& !results.is_empty()
{
for r in &results {
let mut m = r.memory.clone();
m.reinforce();
store.record_access(&m).ok();
}
return Ok(results
.into_iter()
.map(|r| (r.memory, Some(r.score)))
.collect());
}
}
let text_results = store.text_search(&query_text, limit).unwrap_or_default();
for m in &text_results {
let mut updated = m.clone();
updated.reinforce();
store.record_access(&updated).ok();
}
Ok(text_results.into_iter().map(|m| (m, None)).collect())
})
.await;
let memories = match result {
Ok(Ok(m)) => m,
_ => Vec::new(),
};
let machine_names = state.store.get_all_machines().unwrap_or_default();
Json(RecallResponse {
memories: memories
.into_iter()
.map(|(m, score)| to_response(m, score, &machine_names))
.collect(),
})
}
#[derive(Deserialize)]
struct RememberRequest {
content: String,
memory_type: Option<String>,
source: Option<String>,
related_to: Option<Vec<String>>,
project: Option<String>,
}
#[derive(Serialize)]
struct RememberResponse {
id: String,
memory_type: String,
}
async fn remember(
State(state): State<Arc<AppState>>,
Json(req): Json<RememberRequest>,
) -> impl IntoResponse {
let memory_type = req
.memory_type
.as_deref()
.and_then(parse_memory_type)
.unwrap_or(MemoryType::Semantic);
let source = req.source.unwrap_or_else(|| "api".to_string());
let content = req.content.clone();
let related_to = req.related_to.clone();
let project = req.project.clone();
let machine_id = state.machine.id.clone();
let store = state.store.clone();
let embedder = state.embedder.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<(Uuid, MemoryType)> {
let mut memory = Memory::new(content.clone(), memory_type, source, String::new())
.with_project_path(project)
.with_machine_id(machine_id);
if let Some(ref emb) = embedder
&& let Ok(embedding) = emb.embed(&content)
{
memory.embedding = embedding;
}
let memory_id = memory.id;
retry_write(|| store.store_memory(&memory))?;
if let Some(related) = related_to {
for entity_name in &related {
if let Ok(Some(entity)) = store.find_entity_by_name(entity_name) {
let relation = Relation {
from_id: memory_id,
to_id: entity.id,
relation_type: RelationType::Mentions,
strength: 1.0,
context: None,
};
store.store_relation(&relation).ok();
}
}
}
Ok((memory_id, memory_type))
})
.await;
match result {
Ok(Ok((id, mt))) => Json(RememberResponse {
id: id.to_string(),
memory_type: format!("{:?}", mt).to_lowercase(),
})
.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct AssociateRequest {
from_id: String,
to_id: String,
relation: String,
context: Option<String>,
}
async fn associate(
State(state): State<Arc<AppState>>,
Json(req): Json<AssociateRequest>,
) -> impl IntoResponse {
let from_id = match Uuid::parse_str(&req.from_id) {
Ok(id) => id,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid from_id").into_response(),
};
let to_id = match Uuid::parse_str(&req.to_id) {
Ok(id) => id,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid to_id").into_response(),
};
let relation_type = match req.relation.as_str() {
"contradicts" => RelationType::Contradicts,
"reinforces" => RelationType::Reinforces,
"supersedes" => RelationType::Supersedes,
_ => RelationType::RelatesTo,
};
let ctx = req.context.clone();
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || {
let relation = Relation {
from_id,
to_id,
relation_type,
strength: 1.0,
context: ctx,
};
retry_write(|| store.store_relation(&relation))
})
.await;
match result {
Ok(Ok(_)) => StatusCode::CREATED.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
async fn get_memory(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid uuid").into_response(),
};
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.get_memory(uuid)).await;
let machine_names = state.store.get_all_machines().unwrap_or_default();
match result {
Ok(Ok(Some(m))) => Json(to_response(m, None, &machine_names)).into_response(),
Ok(Ok(None)) => StatusCode::NOT_FOUND.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
async fn forget(State(state): State<Arc<AppState>>, Path(id): Path<String>) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid uuid").into_response(),
};
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.delete_memory(uuid)).await;
match result {
Ok(Ok(_)) => StatusCode::NO_CONTENT.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct ContextRequest {
project: String,
depth: Option<u32>,
}
#[derive(Serialize)]
struct ContextResponse {
memories: Vec<ContextMemory>,
}
#[derive(Serialize)]
struct ContextMemory {
id: String,
content: String,
memory_type: String,
confidence: f32,
connected: Vec<String>,
}
async fn context(
State(state): State<Arc<AppState>>,
Json(req): Json<ContextRequest>,
) -> impl IntoResponse {
let project = req.project.clone();
let depth = req.depth.unwrap_or(2);
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<ContextMemory>> {
let mut relevant_owned = store.memories_by_project_path(&project)?;
if relevant_owned.is_empty() {
let all = store.memories_by_source("claude-code")?;
let project_lower = project.to_lowercase();
relevant_owned = all
.into_iter()
.filter(|m| {
m.content.to_lowercase().contains(&project_lower)
|| m.source_id.to_lowercase().contains(&project_lower)
})
.take(10)
.collect();
}
let relevant: Vec<&Memory> = relevant_owned.iter().collect();
let mut out = Vec::new();
for memory in relevant {
let connected: Vec<String> = store
.traverse(memory.id, depth)
.unwrap_or_default()
.into_iter()
.map(|(m, _)| truncate(&m.content, 100))
.collect();
out.push(ContextMemory {
id: memory.id.to_string(),
content: memory.content.clone(),
memory_type: format!("{:?}", memory.memory_type).to_lowercase(),
confidence: memory.confidence,
connected,
});
}
Ok(out)
})
.await;
match result {
Ok(Ok(memories)) => Json(ContextResponse { memories }).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct IngestRequest {
#[allow(dead_code)]
source: Option<String>,
}
#[derive(Serialize)]
struct IngestResponse {
new_memories: usize,
new_entities: usize,
new_conversations: usize,
skipped_dupes: usize,
unchanged: usize,
errors: usize,
files_processed: usize,
}
async fn ingest(
State(state): State<Arc<AppState>>,
Json(_req): Json<IngestRequest>,
) -> impl IntoResponse {
let store = state.store.clone();
let embedder = state.embedder.clone();
let machine_id = state.machine.id.clone();
let result = tokio::task::spawn_blocking(move || run_ingest(&store, embedder.as_deref(), &machine_id)).await;
match result {
Ok(Ok(stats)) => Json(IngestResponse {
new_memories: stats.new_memories,
new_entities: stats.new_entities,
new_conversations: stats.new_conversations,
skipped_dupes: stats.skipped_dupes,
unchanged: stats.unchanged,
errors: stats.errors,
files_processed: stats.files_processed,
})
.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct ReingestRequest {
#[serde(default)]
clean: bool,
}
async fn reingest(
State(state): State<Arc<AppState>>,
Json(req): Json<ReingestRequest>,
) -> impl IntoResponse {
let store = state.store.clone();
let embedder = state.embedder.clone();
let machine_id = state.machine.id.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<serde_json::Value> {
let mut cleaned_claude = 0usize;
let mut cleaned_cursor = 0usize;
if req.clean {
cleaned_claude = store.delete_memories_by_source("claude-code")?;
cleaned_cursor = store.delete_memories_by_source("cursor")?;
}
let cleared_log = store.clear_ingest_log()?;
let stats = run_ingest(&store, embedder.as_deref(), &machine_id)?;
if req.clean {
store.rebuild_vector_index()?;
}
Ok(serde_json::json!({
"cleaned_claude": cleaned_claude,
"cleaned_cursor": cleaned_cursor,
"cleared_log": cleared_log,
"new_memories": stats.new_memories,
"new_entities": stats.new_entities,
"new_conversations": stats.new_conversations,
"skipped_dupes": stats.skipped_dupes,
"unchanged": stats.unchanged,
"errors": stats.errors,
"files_processed": stats.files_processed,
}))
})
.await;
match result {
Ok(Ok(json)) => Json(json).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct ConsolidateBatchParams {
limit: Option<usize>,
}
async fn consolidate_batch(
State(state): State<Arc<AppState>>,
Query(params): Query<ConsolidateBatchParams>,
) -> impl IntoResponse {
let store = state.store.clone();
let limit = params.limit.unwrap_or(50);
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<serde_json::Value> {
let total = store.memory_count()?;
let consolidated = store.consolidation_count()?;
let memories = store.unconsolidated_memories(limit)?;
let mem_json: Vec<serde_json::Value> = memories
.iter()
.map(|m| {
serde_json::json!({
"id": m.id.to_string(),
"content": m.content,
"memory_type": format!("{:?}", m.memory_type).to_lowercase(),
"source": m.source,
})
})
.collect();
Ok(serde_json::json!({
"total": total,
"consolidated": consolidated,
"memories": mem_json,
}))
})
.await;
match result {
Ok(Ok(json)) => Json(json).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct ConsolidateStoreRequest {
raw_id: String,
distilled_content: String,
model: String,
#[serde(default)]
skip: bool,
}
async fn consolidate_store(
State(state): State<Arc<AppState>>,
Json(req): Json<ConsolidateStoreRequest>,
) -> impl IntoResponse {
let store = state.store.clone();
let embedder = state.embedder.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<serde_json::Value> {
let raw_id = Uuid::parse_str(&req.raw_id)?;
if req.skip {
store.mark_consolidated(raw_id, raw_id, &req.model)?;
return Ok(serde_json::json!({"status": "skipped"}));
}
let mut distilled = Memory::new(
req.distilled_content.clone(),
MemoryType::Semantic,
"consolidation".to_string(),
raw_id.to_string(),
);
if let Some(ref emb) = embedder
&& let Ok(embedding) = emb.embed(&req.distilled_content)
{
distilled.embedding = embedding;
}
store.store_memory(&distilled)?;
let link = Relation {
from_id: distilled.id,
to_id: raw_id,
relation_type: RelationType::DistilledFrom,
strength: 1.0,
context: Some(req.model.clone()),
};
store.store_relation(&link).ok();
store.mark_consolidated(raw_id, distilled.id, &req.model)?;
Ok(serde_json::json!({
"status": "stored",
"distilled_id": distilled.id.to_string(),
}))
})
.await;
match result {
Ok(Ok(json)) => Json(json).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
async fn consolidate_cleanup(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.delete_consolidated_raw()).await;
match result {
Ok(Ok(count)) => Json(serde_json::json!({"deleted": count})).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct DailyMemoriesRequest {
date: String,
}
async fn daily_memories(
State(state): State<Arc<AppState>>,
Json(req): Json<DailyMemoriesRequest>,
) -> impl IntoResponse {
let date = match chrono::NaiveDate::parse_from_str(&req.date, "%Y-%m-%d") {
Ok(d) => d,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "invalid date, expected YYYY-MM-DD"})),
)
.into_response();
}
};
let day_start = date.and_hms_opt(0, 0, 0).unwrap().and_utc();
let day_end = (date + chrono::Duration::days(1))
.and_hms_opt(0, 0, 0)
.unwrap()
.and_utc();
let store = state.store.clone();
let result =
tokio::task::spawn_blocking(move || store.memories_created_between(&day_start, &day_end))
.await;
match result {
Ok(Ok(memories)) => {
let count = memories.len();
let items: Vec<serde_json::Value> = memories
.into_iter()
.map(|m| {
serde_json::json!({
"id": m.id.to_string(),
"content": m.content,
"memory_type": format!("{:?}", m.memory_type).to_lowercase(),
"confidence": m.confidence,
"created_at": m.created_at.to_rfc3339(),
"source": m.source,
})
})
.collect();
Json(serde_json::json!({"memories": items, "count": count})).into_response()
}
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct TraverseParams {
start_id: String,
depth: Option<u32>,
}
async fn traverse(
State(state): State<Arc<AppState>>,
Query(params): Query<TraverseParams>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(¶ms.start_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid start_id").into_response(),
};
let depth = params.depth.unwrap_or(2);
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.traverse(uuid, depth)).await;
let machine_names = state.store.get_all_machines().unwrap_or_default();
match result {
Ok(Ok(results)) => {
let memories: Vec<MemoryResponse> = results
.into_iter()
.map(|(m, _)| to_response(m, None, &machine_names))
.collect();
Json(serde_json::json!({"memories": memories})).into_response()
}
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Serialize)]
#[allow(dead_code)]
struct StatusResponse {
memory_count: usize,
ingested_file_count: usize,
version: &'static str,
embedder_available: bool,
}
async fn status(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let store = state.store.clone();
let embedder_available = state.embedder.is_some();
let result = tokio::task::spawn_blocking(move || -> (usize, usize, String) {
let count = store.memory_count().unwrap_or(0);
let files = store.ingested_file_count().unwrap_or(0);
let diag = store.diagnostic().unwrap_or_default();
(count, files, diag)
})
.await;
let (memory_count, ingested_file_count, diagnostic) = result.unwrap_or((0, 0, String::new()));
Json(serde_json::json!({
"memory_count": memory_count,
"ingested_file_count": ingested_file_count,
"version": env!("CARGO_PKG_VERSION"),
"embedder_available": embedder_available,
"diagnostic": diagnostic,
}))
}
#[derive(Deserialize)]
struct SyncExportRequest {
after_seq: Option<u64>,
limit: Option<usize>,
}
#[derive(Serialize)]
struct SyncExportResponse {
records: Vec<serde_json::Value>,
max_seq: u64,
}
async fn sync_export(
State(state): State<Arc<AppState>>,
Json(req): Json<SyncExportRequest>,
) -> impl IntoResponse {
let after_seq = req.after_seq.unwrap_or(0);
let limit = req.limit;
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<SyncExportResponse> {
if limit.is_some() {
let capped = store.sync_log_page(after_seq, limit)?;
let mut sync_records: Vec<second_brain_sync::export::SyncRecord> = capped
.iter()
.map(|e| {
let data = e.data.as_ref().and_then(|d| serde_json::from_str(d).ok());
second_brain_sync::export::SyncRecord {
local_seq: e.local_seq as u64,
origin_machine_id: e.origin_machine_id.clone(),
origin_seq: e.origin_seq as u64,
op: e.op.clone(),
node_type: e.node_type.clone(),
node_id: e.node_id.clone(),
timestamp: e.timestamp,
data,
}
})
.collect();
second_brain_sync::export::sort_for_import(&mut sync_records);
let max_seq = sync_records
.iter()
.map(|r| r.local_seq)
.max()
.unwrap_or(after_seq);
let records: Vec<serde_json::Value> = sync_records
.iter()
.map(|r| {
serde_json::json!({
"local_seq": r.local_seq,
"origin_machine_id": r.origin_machine_id,
"origin_seq": r.origin_seq,
"op": r.op,
"node_type": r.node_type,
"node_id": r.node_id,
"timestamp": r.timestamp,
"data": r.data,
})
})
.collect();
Ok(SyncExportResponse { records, max_seq })
} else {
let mut buf = Vec::new();
let max_seq =
second_brain_sync::export::export_changes(store.as_ref(), after_seq, &mut buf)?;
let jsonl = String::from_utf8(buf)?;
let records: Vec<serde_json::Value> = jsonl
.lines()
.filter(|l| !l.trim().is_empty())
.filter_map(|l| serde_json::from_str(l).ok())
.collect();
Ok(SyncExportResponse { records, max_seq })
}
})
.await;
match result {
Ok(Ok(resp)) => Json(resp).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct SyncImportRequest {
records: Vec<serde_json::Value>,
}
#[derive(Serialize)]
struct SyncImportResponse {
created: u64,
updated: u64,
deleted: u64,
skipped: u64,
errors: u64,
max_seq: u64,
}
async fn sync_import(
State(state): State<Arc<AppState>>,
Json(req): Json<SyncImportRequest>,
) -> impl IntoResponse {
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<SyncImportResponse> {
let mut jsonl = String::new();
for record in &req.records {
jsonl.push_str(&serde_json::to_string(record)?);
jsonl.push('\n');
}
let reader = std::io::BufReader::new(jsonl.as_bytes());
let (stats, watermark) = second_brain_sync::import::import_changes(store.as_ref(), reader)?;
Ok(SyncImportResponse {
created: stats.created,
updated: stats.updated,
deleted: stats.deleted,
skipped: stats.skipped,
errors: stats.errors,
max_seq: watermark,
})
})
.await;
match result {
Ok(Ok(resp)) => Json(resp).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Serialize)]
struct SyncPeerStatus {
peer_id: String,
last_seq: u64,
last_sync_at: String,
}
async fn sync_status(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.get_all_sync_states()).await;
match result {
Ok(Ok(states)) => {
let peers: Vec<SyncPeerStatus> = states
.into_iter()
.map(|s| SyncPeerStatus {
peer_id: s.peer_id,
last_seq: s.last_seq,
last_sync_at: s.last_sync_at.to_rfc3339(),
})
.collect();
Json(serde_json::json!({"peers": peers})).into_response()
}
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
async fn sync_backfill(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.backfill_sync_log()).await;
match result {
Ok(Ok(count)) => Json(serde_json::json!({"backfilled": count})).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
#[derive(Deserialize)]
struct SetSyncStateRequest {
peer_id: String,
last_seq: u64,
}
async fn set_sync_state(
State(state): State<Arc<AppState>>,
Json(req): Json<SetSyncStateRequest>,
) -> impl IntoResponse {
let store = state.store.clone();
let peer_id = req.peer_id;
let last_seq = req.last_seq;
let result = tokio::task::spawn_blocking(move || {
use second_brain_core::schema::SyncState;
store.set_sync_state(&SyncState {
peer_id,
last_seq,
last_sync_at: chrono::Utc::now(),
})
})
.await;
match result {
Ok(Ok(_)) => StatusCode::OK.into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
async fn backfill_project_paths(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let store = state.store.clone();
let result = tokio::task::spawn_blocking(move || store.backfill_project_paths()).await;
match result {
Ok(Ok(count)) => Json(serde_json::json!({"backfilled": count})).into_response(),
Ok(Err(e)) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
fn to_response(
m: Memory,
score: Option<f32>,
machine_names: &std::collections::HashMap<String, String>,
) -> MemoryResponse {
let machine_name = machine_names
.get(&m.machine_id)
.cloned()
.unwrap_or_default();
MemoryResponse {
id: m.id.to_string(),
content: m.content,
memory_type: format!("{:?}", m.memory_type).to_lowercase(),
confidence: m.confidence,
source: m.source,
created_at: m.created_at.to_rfc3339(),
score,
project_path: m.project_path,
machine_id: m.machine_id,
machine_name,
}
}
fn parse_memory_type(s: &str) -> Option<MemoryType> {
match s {
"episodic" => Some(MemoryType::Episodic),
"semantic" => Some(MemoryType::Semantic),
"procedural" => Some(MemoryType::Procedural),
"decision" => Some(MemoryType::Decision),
"architecture" => Some(MemoryType::Architecture),
"debugging" => Some(MemoryType::Debugging),
"task" => Some(MemoryType::Task),
"question" => Some(MemoryType::Question),
_ => None,
}
}
fn truncate(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
s.to_string()
} else {
let mut end = max_len.min(s.len());
while end > 0 && !s.is_char_boundary(end) {
end -= 1;
}
format!("{}...", &s[..end])
}
}
fn retry_write<F, T>(mut op: F) -> anyhow::Result<T>
where
F: FnMut() -> anyhow::Result<T>,
{
let mut last_err = None;
for attempt in 0..5u32 {
match op() {
Ok(v) => return Ok(v),
Err(e) => {
last_err = Some(e);
std::thread::sleep(std::time::Duration::from_millis(50 * 2u64.pow(attempt)));
}
}
}
Err(last_err.unwrap())
}