Skip to main content

haystack_server/ops/
changes.rs

1//! The `changes` op — return graph changelog entries since a given version.
2//!
3//! Used by federation connectors for incremental delta sync instead of full
4//! `read("*")` on every interval.
5//!
6//! # Request Grid Columns
7//!
8//! | Column    | Kind   | Description                                     |
9//! |-----------|--------|-------------------------------------------------|
10//! | `version` | Number | Graph version to query changes since (0 for all) |
11//!
12//! # Response
13//!
14//! Grid meta contains `curVer` (Number) — the current graph version.
15//!
16//! | Column    | Kind   | Description                                    |
17//! |-----------|--------|------------------------------------------------|
18//! | `version` | Number | Version after this mutation                    |
19//! | `op`      | Str    | `"add"`, `"update"`, or `"remove"`             |
20//! | `ref`     | Str    | Entity ref value                               |
21//! | `entity`  | Dict   | Entity data (present for add/update only)      |
22//!
23//! # Errors
24//!
25//! - **400 Bad Request** — request grid decode failure.
26//! - **500 Internal Server Error** — encoding error.
27
28use actix_web::{HttpRequest, HttpResponse, web};
29
30use haystack_core::data::{HCol, HDict, HGrid};
31use haystack_core::graph::changelog::DiffOp;
32use haystack_core::kinds::Kind;
33
34use crate::content;
35use crate::error::HaystackError;
36use crate::state::AppState;
37
38/// Maximum number of change rows returned in a single response.
39const MAX_CHANGE_ROWS: usize = 10_000;
40
41/// POST /api/changes
42///
43/// Request grid should have a single row with a `version` column (Number).
44/// Returns a grid of changelog entries since that version, each with:
45/// - `version`: Number — the version after the mutation
46/// - `op`: Str — "add", "update", or "remove"
47/// - `ref`: Str — the entity ref value
48/// - `entity`: the entity dict (for add/update; absent for remove)
49///
50/// Also includes `curVer` in the response meta with the current graph version,
51/// so the caller can store it for the next delta sync.
52pub async fn handle(
53    req: HttpRequest,
54    body: String,
55    state: web::Data<AppState>,
56) -> Result<HttpResponse, HaystackError> {
57    let content_type = req
58        .headers()
59        .get("Content-Type")
60        .and_then(|v| v.to_str().ok())
61        .unwrap_or("");
62    let accept = req
63        .headers()
64        .get("Accept")
65        .and_then(|v| v.to_str().ok())
66        .unwrap_or("");
67
68    let request_grid = content::decode_request_grid(&body, content_type)
69        .map_err(|e| HaystackError::bad_request(format!("failed to decode request: {e}")))?;
70
71    let since_version = request_grid
72        .row(0)
73        .and_then(|row| row.get("version"))
74        .and_then(|k| {
75            if let Kind::Number(n) = k {
76                Some(n.val as u64)
77            } else {
78                None
79            }
80        })
81        .unwrap_or(0);
82
83    let current_version = state.graph.version();
84    let diffs = match state.graph.changes_since(since_version) {
85        Ok(d) => d,
86        Err(gap) => {
87            // Subscriber fell behind. Return an error grid with the gap info
88            // so the caller knows to do a full resync.
89            let mut err_meta = HDict::new();
90            err_meta.set(
91                "curVer",
92                Kind::Number(haystack_core::kinds::Number::unitless(
93                    current_version as f64,
94                )),
95            );
96            err_meta.set(
97                "err",
98                Kind::Str(format!(
99                    "changelog gap: requested version {}, floor is {}",
100                    gap.subscriber_version, gap.floor_version
101                )),
102            );
103            let grid = HGrid::from_parts(err_meta, Vec::new(), Vec::new());
104            let (encoded, ct) = content::encode_response_grid(&grid, accept)
105                .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
106            return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
107        }
108    };
109
110    let mut meta = HDict::new();
111    meta.set(
112        "curVer",
113        Kind::Number(haystack_core::kinds::Number::unitless(
114            current_version as f64,
115        )),
116    );
117
118    if diffs.is_empty() {
119        let grid = HGrid::from_parts(meta, Vec::new(), Vec::new());
120        let (encoded, ct) = content::encode_response_grid(&grid, accept)
121            .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
122        return Ok(HttpResponse::Ok().content_type(ct).body(encoded));
123    }
124
125    let cols = vec![
126        HCol::new("version"),
127        HCol::new("op"),
128        HCol::new("ref"),
129        HCol::new("ts"),
130        HCol::new("entity"),
131    ];
132
133    let mut rows: Vec<HDict> = diffs
134        .iter()
135        .map(|diff| {
136            let mut row = HDict::new();
137            row.set(
138                "version",
139                Kind::Number(haystack_core::kinds::Number::unitless(diff.version as f64)),
140            );
141            row.set(
142                "op",
143                Kind::Str(match diff.op {
144                    DiffOp::Add => "add".to_string(),
145                    DiffOp::Update => "update".to_string(),
146                    DiffOp::Remove => "remove".to_string(),
147                }),
148            );
149            row.set("ref", Kind::Str(diff.ref_val.clone()));
150            row.set(
151                "ts",
152                Kind::Number(haystack_core::kinds::Number::unitless(
153                    diff.timestamp as f64,
154                )),
155            );
156            // Include entity data: full entity for Add, changed_tags for Update.
157            if let Some(entity) = &diff.new {
158                row.set("entity", Kind::Dict(Box::new(entity.clone())));
159            } else if let Some(changed) = &diff.changed_tags {
160                row.set("entity", Kind::Dict(Box::new(changed.clone())));
161            }
162            row
163        })
164        .collect();
165
166    let truncated = rows.len() > MAX_CHANGE_ROWS;
167    if truncated {
168        rows.truncate(MAX_CHANGE_ROWS);
169        meta.set("truncated", Kind::Marker);
170        meta.set(
171            "maxRows",
172            Kind::Number(haystack_core::kinds::Number::unitless(
173                MAX_CHANGE_ROWS as f64,
174            )),
175        );
176    }
177
178    let grid = HGrid::from_parts(meta, cols, rows);
179    let (encoded, ct) = content::encode_response_grid(&grid, accept)
180        .map_err(|e| HaystackError::internal(format!("encoding error: {e}")))?;
181
182    Ok(HttpResponse::Ok().content_type(ct).body(encoded))
183}