1use std::time::{SystemTime, UNIX_EPOCH};
12
13use crate::storage::blockchain::{
14 compute_block_hash, verify_chain, Block, VerifyReport, GENESIS_PREV_HASH,
15};
16use crate::storage::schema::Value;
17use crate::storage::unified::UnifiedStore;
18
19pub const CHAIN_KIND_TAG: &str = "chain";
21
22pub const COL_BLOCK_HEIGHT: &str = "block_height";
23pub const COL_PREV_HASH: &str = "prev_hash";
24pub const COL_TIMESTAMP: &str = "timestamp";
25pub const COL_HASH: &str = "hash";
26
27pub const RESERVED_COLUMNS: &[&str] = &[COL_BLOCK_HEIGHT, COL_PREV_HASH, COL_TIMESTAMP, COL_HASH];
30
31#[derive(Debug, Clone, PartialEq, Eq)]
33pub struct ChainTip {
34 pub height: Option<u64>,
38 pub hash: [u8; 32],
39}
40
41impl ChainTip {
42 pub fn empty() -> Self {
43 Self {
44 height: None,
45 hash: GENESIS_PREV_HASH,
46 }
47 }
48
49 pub fn next(&self) -> ([u8; 32], u64) {
51 let next_height = self.height.map(|h| h + 1).unwrap_or(0);
52 (self.hash, next_height)
53 }
54}
55
56fn kind_key(collection: &str) -> String {
57 format!("red.collection.{collection}.kind")
58}
59
60pub fn mark_as_chain(store: &UnifiedStore, collection: &str) {
62 store.set_config_tree(
63 &kind_key(collection),
64 &crate::serde_json::Value::String(CHAIN_KIND_TAG.to_string()),
65 );
66}
67
68pub fn is_chain(store: &UnifiedStore, collection: &str) -> bool {
70 match store.get_config(&kind_key(collection)) {
71 Some(Value::Text(s)) => s.as_ref() == CHAIN_KIND_TAG,
72 _ => false,
73 }
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
79pub struct ChainTipFull {
80 pub height: u64,
81 pub hash: [u8; 32],
82 pub timestamp_ms: u64,
83}
84
85pub fn chain_tip_full(store: &UnifiedStore, collection: &str) -> Option<ChainTipFull> {
89 let manager = store.get_collection(collection)?;
90 let mut best: Option<ChainTipFull> = None;
91 for entity in manager.query_all(|_| true) {
92 let crate::storage::unified::EntityData::Row(row) = &entity.data else {
93 continue;
94 };
95 let Some(named) = &row.named else { continue };
96 let height = match named.get(COL_BLOCK_HEIGHT) {
97 Some(Value::UnsignedInteger(v)) => *v,
98 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
99 _ => continue,
100 };
101 let hash = match named.get(COL_HASH) {
102 Some(Value::Blob(b)) if b.len() == 32 => {
103 let mut out = [0u8; 32];
104 out.copy_from_slice(b);
105 out
106 }
107 _ => continue,
108 };
109 let timestamp_ms = match named.get(COL_TIMESTAMP) {
110 Some(Value::UnsignedInteger(v)) => *v,
111 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
112 _ => 0,
113 };
114 match &best {
115 None => {
116 best = Some(ChainTipFull {
117 height,
118 hash,
119 timestamp_ms,
120 });
121 }
122 Some(cur) if height > cur.height => {
123 best = Some(ChainTipFull {
124 height,
125 hash,
126 timestamp_ms,
127 });
128 }
129 _ => {}
130 }
131 }
132 best
133}
134
135pub fn chain_tip(store: &UnifiedStore, collection: &str) -> ChainTip {
138 let Some(manager) = store.get_collection(collection) else {
139 return ChainTip::empty();
140 };
141 let mut best: Option<(u64, [u8; 32])> = None;
142 for entity in manager.query_all(|_| true) {
143 let crate::storage::unified::EntityData::Row(row) = &entity.data else {
144 continue;
145 };
146 let Some(named) = &row.named else {
147 continue;
148 };
149 let height = match named.get(COL_BLOCK_HEIGHT) {
150 Some(Value::UnsignedInteger(v)) => *v,
151 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
152 _ => continue,
153 };
154 let hash = match named.get(COL_HASH) {
155 Some(Value::Blob(b)) if b.len() == 32 => {
156 let mut out = [0u8; 32];
157 out.copy_from_slice(b);
158 out
159 }
160 _ => continue,
161 };
162 match best {
163 None => best = Some((height, hash)),
164 Some((h, _)) if height > h => best = Some((height, hash)),
165 _ => {}
166 }
167 }
168 match best {
169 Some((height, hash)) => ChainTip {
170 height: Some(height),
171 hash,
172 },
173 None => ChainTip::empty(),
174 }
175}
176
177fn hex32(bytes: &[u8; 32]) -> String {
178 let mut s = String::with_capacity(64);
179 for b in bytes {
180 s.push_str(&format!("{b:02x}"));
181 }
182 s
183}
184
185pub fn chain_conflict_error(
189 tip_height: u64,
190 tip_hash: [u8; 32],
191 tip_timestamp_ms: u64,
192 server_now_ms: u64,
193 reason: &str,
194) -> crate::api::RedDBError {
195 let body = format!(
196 "{{\"block_height\":{},\"hash\":\"{}\",\"timestamp\":{},\"server_time\":{},\"reason\":\"{}\"}}",
197 tip_height,
198 hex32(&tip_hash),
199 tip_timestamp_ms,
200 server_now_ms,
201 reason.replace('"', "'")
202 );
203 crate::api::RedDBError::InvalidOperation(format!("BlockchainConflict:{body}"))
204}
205
206pub fn now_ms() -> u64 {
207 SystemTime::now()
208 .duration_since(UNIX_EPOCH)
209 .map(|d| d.as_millis() as u64)
210 .unwrap_or(0)
211}
212
213pub fn canonical_payload(fields: &[(String, Value)]) -> Vec<u8> {
217 let mut pairs: Vec<(&str, String)> = fields
218 .iter()
219 .filter(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()))
220 .map(|(k, v)| (k.as_str(), v.plain_text()))
221 .collect();
222 pairs.sort_by(|a, b| a.0.cmp(b.0));
223 let mut out = Vec::new();
224 for (k, v) in pairs {
225 out.extend_from_slice(k.as_bytes());
226 out.push(b'=');
227 out.extend_from_slice(v.as_bytes());
228 out.push(b';');
229 }
230 out
231}
232
233pub fn make_block_reserved_fields(
238 prev_hash: [u8; 32],
239 height: u64,
240 timestamp_ms: u64,
241 payload_canonical: &[u8],
242) -> (Vec<(String, Value)>, [u8; 32]) {
243 let hash = compute_block_hash(&prev_hash, height, timestamp_ms, payload_canonical, None);
244 let fields = vec![
245 (COL_BLOCK_HEIGHT.to_string(), Value::UnsignedInteger(height)),
246 (COL_PREV_HASH.to_string(), Value::Blob(prev_hash.to_vec())),
247 (
248 COL_TIMESTAMP.to_string(),
249 Value::UnsignedInteger(timestamp_ms),
250 ),
251 (COL_HASH.to_string(), Value::Blob(hash.to_vec())),
252 ];
253 (fields, hash)
254}
255
256pub fn collect_blocks(store: &UnifiedStore, collection: &str) -> Option<Vec<Block>> {
263 if !is_chain(store, collection) {
264 return None;
265 }
266 let manager = store.get_collection(collection)?;
267 let mut blocks: Vec<Block> = Vec::new();
268 for entity in manager.query_all(|_| true) {
269 let crate::storage::unified::EntityData::Row(row) = &entity.data else {
270 continue;
271 };
272 let Some(named) = &row.named else { continue };
273 let height = match named.get(COL_BLOCK_HEIGHT) {
274 Some(Value::UnsignedInteger(v)) => *v,
275 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
276 _ => continue,
277 };
278 let prev_hash = match named.get(COL_PREV_HASH) {
279 Some(Value::Blob(b)) if b.len() == 32 => {
280 let mut out = [0u8; 32];
281 out.copy_from_slice(b);
282 out
283 }
284 _ => continue,
285 };
286 let timestamp_ms = match named.get(COL_TIMESTAMP) {
287 Some(Value::UnsignedInteger(v)) => *v,
288 Some(Value::Integer(v)) if *v >= 0 => *v as u64,
289 _ => continue,
290 };
291 let hash = match named.get(COL_HASH) {
292 Some(Value::Blob(b)) if b.len() == 32 => {
293 let mut out = [0u8; 32];
294 out.copy_from_slice(b);
295 out
296 }
297 _ => continue,
298 };
299 let user_fields: Vec<(String, Value)> = named
300 .iter()
301 .filter(|(k, _)| !RESERVED_COLUMNS.contains(&k.as_str()))
302 .map(|(k, v)| (k.clone(), v.clone()))
303 .collect();
304 let payload = canonical_payload(&user_fields);
305 blocks.push(Block {
306 block_height: height,
307 prev_hash,
308 timestamp_ms,
309 payload,
310 signed: None,
311 hash,
312 });
313 }
314 blocks.sort_by_key(|b| b.block_height);
315 Some(blocks)
316}
317
318#[derive(Debug, Clone, PartialEq, Eq)]
320pub struct VerifyChainOutcome {
321 pub checked: u64,
322 pub ok: bool,
323 pub first_bad_height: Option<u64>,
324}
325
326pub fn verify_chain_outcome(store: &UnifiedStore, collection: &str) -> Option<VerifyChainOutcome> {
329 let blocks = collect_blocks(store, collection)?;
330 let checked = blocks.len() as u64;
331 match verify_chain(&blocks) {
332 VerifyReport::Ok => Some(VerifyChainOutcome {
333 checked,
334 ok: true,
335 first_bad_height: None,
336 }),
337 VerifyReport::Inconsistent { block_height, .. } => Some(VerifyChainOutcome {
338 checked,
339 ok: false,
340 first_bad_height: Some(block_height),
341 }),
342 }
343}
344
345fn integrity_key(collection: &str) -> String {
346 format!("red.collection.{collection}.integrity")
347}
348
349const INTEGRITY_BROKEN: &str = "broken";
350const INTEGRITY_OK: &str = "ok";
351
352pub fn persist_integrity_flag(store: &UnifiedStore, collection: &str, broken: bool) {
355 let tag = if broken {
356 INTEGRITY_BROKEN
357 } else {
358 INTEGRITY_OK
359 };
360 store.set_config_tree(
361 &integrity_key(collection),
362 &crate::serde_json::Value::String(tag.to_string()),
363 );
364}
365
366pub fn is_integrity_broken_persisted(store: &UnifiedStore, collection: &str) -> Option<bool> {
369 let manager = store.get_collection("red_config")?;
370 let key = integrity_key(collection);
371 let mut latest: Option<(u64, String)> = None;
372 for entity in manager.query_all(|_| true) {
373 let crate::storage::unified::EntityData::Row(row) = &entity.data else {
374 continue;
375 };
376 let Some(named) = &row.named else { continue };
377 let k_match =
378 matches!(named.get("key"), Some(Value::Text(s)) if s.as_ref() == key.as_str());
379 if !k_match {
380 continue;
381 }
382 let Some(Value::Text(v)) = named.get("value") else {
383 continue;
384 };
385 let id = entity.id.raw();
386 if latest.as_ref().map(|(prev, _)| id > *prev).unwrap_or(true) {
387 latest = Some((id, v.as_ref().to_string()));
388 }
389 }
390 latest.map(|(_, tag)| tag == INTEGRITY_BROKEN)
391}
392
393pub fn genesis_fields(timestamp_ms: u64) -> Vec<(String, Value)> {
396 make_block_reserved_fields(GENESIS_PREV_HASH, 0, timestamp_ms, &[]).0
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402
403 #[test]
404 fn reserved_columns_complete() {
405 assert_eq!(RESERVED_COLUMNS.len(), 4);
406 assert!(RESERVED_COLUMNS.contains(&COL_BLOCK_HEIGHT));
407 assert!(RESERVED_COLUMNS.contains(&COL_PREV_HASH));
408 assert!(RESERVED_COLUMNS.contains(&COL_TIMESTAMP));
409 assert!(RESERVED_COLUMNS.contains(&COL_HASH));
410 }
411
412 #[test]
413 fn empty_tip_advances_to_genesis_height() {
414 let tip = ChainTip::empty();
415 let (prev, height) = tip.next();
416 assert_eq!(prev, GENESIS_PREV_HASH);
417 assert_eq!(height, 0);
418 }
419
420 #[test]
421 fn tip_with_height_advances_by_one() {
422 let tip = ChainTip {
423 height: Some(7),
424 hash: [0xAB; 32],
425 };
426 let (prev, height) = tip.next();
427 assert_eq!(prev, [0xAB; 32]);
428 assert_eq!(height, 8);
429 }
430
431 #[test]
432 fn genesis_fields_carry_zero_prev_hash() {
433 let fields = genesis_fields(1_700_000_000_000);
434 let prev = fields.iter().find(|(k, _)| k == COL_PREV_HASH).unwrap();
435 match &prev.1 {
436 Value::Blob(b) => assert_eq!(&b[..], &[0u8; 32]),
437 _ => panic!("prev_hash must be Blob"),
438 }
439 let height = fields.iter().find(|(k, _)| k == COL_BLOCK_HEIGHT).unwrap();
440 assert_eq!(height.1, Value::UnsignedInteger(0));
441 }
442
443 #[test]
444 fn canonical_payload_is_order_independent() {
445 let a = vec![
446 ("user".to_string(), Value::text("alice")),
447 ("amount".to_string(), Value::Integer(100)),
448 ];
449 let b = vec![
450 ("amount".to_string(), Value::Integer(100)),
451 ("user".to_string(), Value::text("alice")),
452 ];
453 assert_eq!(canonical_payload(&a), canonical_payload(&b));
454 }
455
456 #[test]
457 fn canonical_payload_skips_reserved_columns() {
458 let fields = vec![
459 ("user".to_string(), Value::text("alice")),
460 (COL_BLOCK_HEIGHT.to_string(), Value::UnsignedInteger(42)),
461 (COL_HASH.to_string(), Value::Blob(vec![0xFF; 32])),
462 ];
463 let bytes = canonical_payload(&fields);
464 let s = String::from_utf8(bytes).unwrap();
465 assert_eq!(s, "user=alice;");
466 }
467
468 #[test]
469 fn block_hash_matches_recompute() {
470 let (fields, hash) =
471 make_block_reserved_fields(GENESIS_PREV_HASH, 0, 1_700_000_000_000, b"user=alice;");
472 let recomputed = compute_block_hash(
473 &GENESIS_PREV_HASH,
474 0,
475 1_700_000_000_000,
476 b"user=alice;",
477 None,
478 );
479 assert_eq!(hash, recomputed);
480 let stored = fields.iter().find(|(k, _)| k == COL_HASH).unwrap();
481 match &stored.1 {
482 Value::Blob(b) => assert_eq!(&b[..], &hash[..]),
483 _ => panic!("hash must be Blob"),
484 }
485 }
486}