1use std::time::{SystemTime, UNIX_EPOCH};
12
13use crate::storage::blockchain::{compute_block_hash, verify_chain, Block, GENESIS_PREV_HASH, VerifyReport};
14use crate::storage::schema::Value;
15use crate::storage::unified::UnifiedStore;
16
17pub const CHAIN_KIND_TAG: &str = "chain";
19
20pub const COL_BLOCK_HEIGHT: &str = "block_height";
21pub const COL_PREV_HASH: &str = "prev_hash";
22pub const COL_TIMESTAMP: &str = "timestamp";
23pub const COL_HASH: &str = "hash";
24
25pub const RESERVED_COLUMNS: &[&str] = &[COL_BLOCK_HEIGHT, COL_PREV_HASH, COL_TIMESTAMP, COL_HASH];
28
29#[derive(Debug, Clone, PartialEq, Eq)]
31pub struct ChainTip {
32 pub height: Option<u64>,
36 pub hash: [u8; 32],
37}
38
39impl ChainTip {
40 pub fn empty() -> Self {
41 Self {
42 height: None,
43 hash: GENESIS_PREV_HASH,
44 }
45 }
46
47 pub fn next(&self) -> ([u8; 32], u64) {
49 let next_height = self.height.map(|h| h + 1).unwrap_or(0);
50 (self.hash, next_height)
51 }
52}
53
54fn kind_key(collection: &str) -> String {
55 format!("red.collection.{collection}.kind")
56}
57
58pub fn mark_as_chain(store: &UnifiedStore, collection: &str) {
60 store.set_config_tree(
61 &kind_key(collection),
62 &crate::serde_json::Value::String(CHAIN_KIND_TAG.to_string()),
63 );
64}
65
66pub fn is_chain(store: &UnifiedStore, collection: &str) -> bool {
68 match store.get_config(&kind_key(collection)) {
69 Some(Value::Text(s)) => s.as_ref() == CHAIN_KIND_TAG,
70 _ => false,
71 }
72}
73
74#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct ChainTipFull {
78 pub height: u64,
79 pub hash: [u8; 32],
80 pub timestamp_ms: u64,
81}
82
83pub fn chain_tip_full(store: &UnifiedStore, collection: &str) -> Option<ChainTipFull> {
87 let manager = store.get_collection(collection)?;
88 let mut best: Option<ChainTipFull> = None;
89 for entity in manager.query_all(|_| true) {
90 let crate::storage::unified::EntityData::Row(row) = &entity.data else {
91 continue;
92 };
93 let Some(named) = &row.named else { continue };
94 let height = match named.get(COL_BLOCK_HEIGHT) {
95 Some(Value::UnsignedInteger(v)) => *v,
96 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
97 _ => continue,
98 };
99 let hash = match named.get(COL_HASH) {
100 Some(Value::Blob(b)) if b.len() == 32 => {
101 let mut out = [0u8; 32];
102 out.copy_from_slice(b);
103 out
104 }
105 _ => continue,
106 };
107 let timestamp_ms = match named.get(COL_TIMESTAMP) {
108 Some(Value::UnsignedInteger(v)) => *v,
109 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
110 _ => 0,
111 };
112 match &best {
113 None => {
114 best = Some(ChainTipFull {
115 height,
116 hash,
117 timestamp_ms,
118 });
119 }
120 Some(cur) if height > cur.height => {
121 best = Some(ChainTipFull {
122 height,
123 hash,
124 timestamp_ms,
125 });
126 }
127 _ => {}
128 }
129 }
130 best
131}
132
133pub fn chain_tip(store: &UnifiedStore, collection: &str) -> ChainTip {
136 let Some(manager) = store.get_collection(collection) else {
137 return ChainTip::empty();
138 };
139 let mut best: Option<(u64, [u8; 32])> = None;
140 for entity in manager.query_all(|_| true) {
141 let crate::storage::unified::EntityData::Row(row) = &entity.data else {
142 continue;
143 };
144 let Some(named) = &row.named else {
145 continue;
146 };
147 let height = match named.get(COL_BLOCK_HEIGHT) {
148 Some(Value::UnsignedInteger(v)) => *v,
149 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
150 _ => continue,
151 };
152 let hash = match named.get(COL_HASH) {
153 Some(Value::Blob(b)) if b.len() == 32 => {
154 let mut out = [0u8; 32];
155 out.copy_from_slice(b);
156 out
157 }
158 _ => continue,
159 };
160 match best {
161 None => best = Some((height, hash)),
162 Some((h, _)) if height > h => best = Some((height, hash)),
163 _ => {}
164 }
165 }
166 match best {
167 Some((height, hash)) => ChainTip {
168 height: Some(height),
169 hash,
170 },
171 None => ChainTip::empty(),
172 }
173}
174
175fn hex32(bytes: &[u8; 32]) -> String {
176 let mut s = String::with_capacity(64);
177 for b in bytes {
178 s.push_str(&format!("{b:02x}"));
179 }
180 s
181}
182
183pub fn chain_conflict_error(
187 tip_height: u64,
188 tip_hash: [u8; 32],
189 tip_timestamp_ms: u64,
190 server_now_ms: u64,
191 reason: &str,
192) -> crate::api::RedDBError {
193 let body = format!(
194 "{{\"block_height\":{},\"hash\":\"{}\",\"timestamp\":{},\"server_time\":{},\"reason\":\"{}\"}}",
195 tip_height,
196 hex32(&tip_hash),
197 tip_timestamp_ms,
198 server_now_ms,
199 reason.replace('"', "'")
200 );
201 crate::api::RedDBError::InvalidOperation(format!("BlockchainConflict:{body}"))
202}
203
204pub fn now_ms() -> u64 {
205 SystemTime::now()
206 .duration_since(UNIX_EPOCH)
207 .map(|d| d.as_millis() as u64)
208 .unwrap_or(0)
209}
210
211pub fn canonical_payload(fields: &[(String, Value)]) -> Vec<u8> {
215 let mut pairs: Vec<(&str, String)> = fields
216 .iter()
217 .filter(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()))
218 .map(|(k, v)| (k.as_str(), v.plain_text()))
219 .collect();
220 pairs.sort_by(|a, b| a.0.cmp(b.0));
221 let mut out = Vec::new();
222 for (k, v) in pairs {
223 out.extend_from_slice(k.as_bytes());
224 out.push(b'=');
225 out.extend_from_slice(v.as_bytes());
226 out.push(b';');
227 }
228 out
229}
230
231pub fn make_block_reserved_fields(
236 prev_hash: [u8; 32],
237 height: u64,
238 timestamp_ms: u64,
239 payload_canonical: &[u8],
240) -> (Vec<(String, Value)>, [u8; 32]) {
241 let hash = compute_block_hash(&prev_hash, height, timestamp_ms, payload_canonical, None);
242 let fields = vec![
243 (
244 COL_BLOCK_HEIGHT.to_string(),
245 Value::UnsignedInteger(height),
246 ),
247 (COL_PREV_HASH.to_string(), Value::Blob(prev_hash.to_vec())),
248 (
249 COL_TIMESTAMP.to_string(),
250 Value::UnsignedInteger(timestamp_ms),
251 ),
252 (COL_HASH.to_string(), Value::Blob(hash.to_vec())),
253 ];
254 (fields, hash)
255}
256
257pub fn collect_blocks(store: &UnifiedStore, collection: &str) -> Option<Vec<Block>> {
264 if !is_chain(store, collection) {
265 return None;
266 }
267 let manager = store.get_collection(collection)?;
268 let mut blocks: Vec<Block> = Vec::new();
269 for entity in manager.query_all(|_| true) {
270 let crate::storage::unified::EntityData::Row(row) = &entity.data else {
271 continue;
272 };
273 let Some(named) = &row.named else { continue };
274 let height = match named.get(COL_BLOCK_HEIGHT) {
275 Some(Value::UnsignedInteger(v)) => *v,
276 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
277 _ => continue,
278 };
279 let prev_hash = match named.get(COL_PREV_HASH) {
280 Some(Value::Blob(b)) if b.len() == 32 => {
281 let mut out = [0u8; 32];
282 out.copy_from_slice(b);
283 out
284 }
285 _ => continue,
286 };
287 let timestamp_ms = match named.get(COL_TIMESTAMP) {
288 Some(Value::UnsignedInteger(v)) => *v,
289 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
290 _ => continue,
291 };
292 let hash = match named.get(COL_HASH) {
293 Some(Value::Blob(b)) if b.len() == 32 => {
294 let mut out = [0u8; 32];
295 out.copy_from_slice(b);
296 out
297 }
298 _ => continue,
299 };
300 let user_fields: Vec<(String, Value)> = named
301 .iter()
302 .filter(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()))
303 .map(|(k, v)| (k.clone(), v.clone()))
304 .collect();
305 let payload = canonical_payload(&user_fields);
306 blocks.push(Block {
307 block_height: height,
308 prev_hash,
309 timestamp_ms,
310 payload,
311 signed: None,
312 hash,
313 });
314 }
315 blocks.sort_by_key(|b| b.block_height);
316 Some(blocks)
317}
318
319#[derive(Debug, Clone, PartialEq, Eq)]
321pub struct VerifyChainOutcome {
322 pub checked: u64,
323 pub ok: bool,
324 pub first_bad_height: Option<u64>,
325}
326
327pub fn verify_chain_outcome(store: &UnifiedStore, collection: &str) -> Option<VerifyChainOutcome> {
330 let blocks = collect_blocks(store, collection)?;
331 let checked = blocks.len() as u64;
332 match verify_chain(&blocks) {
333 VerifyReport::Ok => Some(VerifyChainOutcome {
334 checked,
335 ok: true,
336 first_bad_height: None,
337 }),
338 VerifyReport::Inconsistent { block_height, .. } => Some(VerifyChainOutcome {
339 checked,
340 ok: false,
341 first_bad_height: Some(block_height),
342 }),
343 }
344}
345
346fn integrity_key(collection: &str) -> String {
347 format!("red.collection.{collection}.integrity")
348}
349
350const INTEGRITY_BROKEN: &str = "broken";
351const INTEGRITY_OK: &str = "ok";
352
353pub fn persist_integrity_flag(store: &UnifiedStore, collection: &str, broken: bool) {
356 let tag = if broken { INTEGRITY_BROKEN } else { INTEGRITY_OK };
357 store.set_config_tree(
358 &integrity_key(collection),
359 &crate::serde_json::Value::String(tag.to_string()),
360 );
361}
362
363pub fn is_integrity_broken_persisted(store: &UnifiedStore, collection: &str) -> Option<bool> {
366 let manager = store.get_collection("red_config")?;
367 let key = integrity_key(collection);
368 let mut latest: Option<(u64, String)> = None;
369 for entity in manager.query_all(|_| true) {
370 let crate::storage::unified::EntityData::Row(row) = &entity.data else {
371 continue;
372 };
373 let Some(named) = &row.named else { continue };
374 let k_match = matches!(named.get("key"), Some(Value::Text(s)) if s.as_ref() == key.as_str());
375 if !k_match {
376 continue;
377 }
378 let Some(Value::Text(v)) = named.get("value") else {
379 continue;
380 };
381 let id = entity.id.raw();
382 if latest.as_ref().map(|(prev, _)| id > *prev).unwrap_or(true) {
383 latest = Some((id, v.as_ref().to_string()));
384 }
385 }
386 latest.map(|(_, tag)| tag == INTEGRITY_BROKEN)
387}
388
389pub fn genesis_fields(timestamp_ms: u64) -> Vec<(String, Value)> {
392 make_block_reserved_fields(GENESIS_PREV_HASH, 0, timestamp_ms, &[]).0
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 #[test]
400 fn reserved_columns_complete() {
401 assert_eq!(RESERVED_COLUMNS.len(), 4);
402 assert!(RESERVED_COLUMNS.contains(&COL_BLOCK_HEIGHT));
403 assert!(RESERVED_COLUMNS.contains(&COL_PREV_HASH));
404 assert!(RESERVED_COLUMNS.contains(&COL_TIMESTAMP));
405 assert!(RESERVED_COLUMNS.contains(&COL_HASH));
406 }
407
408 #[test]
409 fn empty_tip_advances_to_genesis_height() {
410 let tip = ChainTip::empty();
411 let (prev, height) = tip.next();
412 assert_eq!(prev, GENESIS_PREV_HASH);
413 assert_eq!(height, 0);
414 }
415
416 #[test]
417 fn tip_with_height_advances_by_one() {
418 let tip = ChainTip {
419 height: Some(7),
420 hash: [0xAB; 32],
421 };
422 let (prev, height) = tip.next();
423 assert_eq!(prev, [0xAB; 32]);
424 assert_eq!(height, 8);
425 }
426
427 #[test]
428 fn genesis_fields_carry_zero_prev_hash() {
429 let fields = genesis_fields(1_700_000_000_000);
430 let prev = fields.iter().find(|(k, _)| k == COL_PREV_HASH).unwrap();
431 match &prev.1 {
432 Value::Blob(b) => assert_eq!(&b[..], &[0u8; 32]),
433 _ => panic!("prev_hash must be Blob"),
434 }
435 let height = fields.iter().find(|(k, _)| k == COL_BLOCK_HEIGHT).unwrap();
436 assert_eq!(height.1, Value::UnsignedInteger(0));
437 }
438
439 #[test]
440 fn canonical_payload_is_order_independent() {
441 let a = vec![
442 ("user".to_string(), Value::text("alice")),
443 ("amount".to_string(), Value::Integer(100)),
444 ];
445 let b = vec![
446 ("amount".to_string(), Value::Integer(100)),
447 ("user".to_string(), Value::text("alice")),
448 ];
449 assert_eq!(canonical_payload(&a), canonical_payload(&b));
450 }
451
452 #[test]
453 fn canonical_payload_skips_reserved_columns() {
454 let fields = vec![
455 ("user".to_string(), Value::text("alice")),
456 (
457 COL_BLOCK_HEIGHT.to_string(),
458 Value::UnsignedInteger(42),
459 ),
460 (COL_HASH.to_string(), Value::Blob(vec![0xFF; 32])),
461 ];
462 let bytes = canonical_payload(&fields);
463 let s = String::from_utf8(bytes).unwrap();
464 assert_eq!(s, "user=alice;");
465 }
466
467 #[test]
468 fn block_hash_matches_recompute() {
469 let (fields, hash) = make_block_reserved_fields(
470 GENESIS_PREV_HASH,
471 0,
472 1_700_000_000_000,
473 b"user=alice;",
474 );
475 let recomputed =
476 compute_block_hash(&GENESIS_PREV_HASH, 0, 1_700_000_000_000, b"user=alice;", None);
477 assert_eq!(hash, recomputed);
478 let stored = fields.iter().find(|(k, _)| k == COL_HASH).unwrap();
479 match &stored.1 {
480 Value::Blob(b) => assert_eq!(&b[..], &hash[..]),
481 _ => panic!("hash must be Blob"),
482 }
483 }
484}