1use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11
12use arrow::record_batch::RecordBatch;
13use tokio::sync::RwLock;
14use tracing::info;
15
16use apiary_core::config::NodeConfig;
17use apiary_core::error::ApiaryError;
18use apiary_core::registry_manager::RegistryManager;
19use apiary_core::storage::StorageBackend;
20use apiary_core::{CellSizingPolicy, FrameSchema, LedgerAction, Result, WriteResult};
21use apiary_query::ApiaryQueryContext;
22use apiary_storage::cell_reader::CellReader;
23use apiary_storage::cell_writer::CellWriter;
24use apiary_storage::ledger::Ledger;
25use apiary_storage::local::LocalBackend;
26use apiary_storage::s3::S3Backend;
27
28use crate::bee::{BeePool, BeeStatus};
29use crate::behavioral::{AbandonmentTracker, ColonyThermometer};
30use crate::cache::CellCache;
31use crate::heartbeat::{HeartbeatWriter, NodeState, WorldView, WorldViewBuilder};
32
33pub struct ApiaryNode {
40 pub config: NodeConfig,
42
43 pub storage: Arc<dyn StorageBackend>,
45
46 pub registry: Arc<RegistryManager>,
48
49 pub query_ctx: Arc<tokio::sync::Mutex<ApiaryQueryContext>>,
51
52 pub bee_pool: Arc<BeePool>,
54
55 pub cell_cache: Arc<CellCache>,
57
58 pub thermometer: ColonyThermometer,
60
61 pub abandonment_tracker: Arc<AbandonmentTracker>,
63
64 heartbeat_writer: Arc<HeartbeatWriter>,
66
67 world_view: Arc<RwLock<WorldView>>,
69
70 #[allow(dead_code)]
72 world_view_builder: Arc<WorldViewBuilder>,
73
74 cancel_tx: tokio::sync::watch::Sender<bool>,
76}
77
78impl ApiaryNode {
79 pub async fn start(config: NodeConfig) -> Result<Self> {
84 let storage: Arc<dyn StorageBackend> = if config.storage_uri.starts_with("s3://") {
85 Arc::new(S3Backend::new(&config.storage_uri)?)
86 } else {
87 let path = config
89 .storage_uri
90 .strip_prefix("local://")
91 .unwrap_or(&config.storage_uri);
92
93 let expanded = if path.starts_with("~/") || path.starts_with("~\\") {
95 let home = home_dir().ok_or_else(|| ApiaryError::Config {
96 message: "Cannot determine home directory".to_string(),
97 })?;
98 home.join(&path[2..])
99 } else {
100 std::path::PathBuf::from(path)
101 };
102
103 Arc::new(LocalBackend::new(expanded).await?)
104 };
105
106 info!(
107 node_id = %config.node_id,
108 cores = config.cores,
109 memory_mb = config.memory_bytes / (1024 * 1024),
110 memory_per_bee_mb = config.memory_per_bee / (1024 * 1024),
111 target_cell_size_mb = config.target_cell_size / (1024 * 1024),
112 storage_uri = %config.storage_uri,
113 "Apiary node started"
114 );
115
116 let registry = Arc::new(RegistryManager::new(Arc::clone(&storage)));
118 {
119 let max_retries: u32 = 5;
120 let mut delay = Duration::from_secs(1);
121 let mut last_err = None;
122 for attempt in 1..=max_retries {
123 match registry.load_or_create().await {
124 Ok(_) => {
125 last_err = None;
126 break;
127 }
128 Err(e) => {
129 tracing::warn!(
130 attempt,
131 max_retries,
132 error = %e,
133 "Registry initialization failed, retrying"
134 );
135 last_err = Some(e);
136 if attempt < max_retries {
137 tokio::time::sleep(delay).await;
138 delay = (delay * 2).min(Duration::from_secs(10));
139 }
140 }
141 }
142 }
143 if let Some(e) = last_err {
144 return Err(e);
145 }
146 }
147 info!("Registry loaded");
148
149 let query_ctx = Arc::new(tokio::sync::Mutex::new(ApiaryQueryContext::with_node_id(
151 Arc::clone(&storage),
152 Arc::clone(®istry),
153 config.node_id.clone(),
154 )));
155
156 let bee_pool = Arc::new(BeePool::new(&config));
158 info!(bees = config.cores, "Bee pool initialized");
159
160 let cache_dir = config.cache_dir.join("cells");
162 let cell_cache =
163 Arc::new(CellCache::new(cache_dir, config.max_cache_size, Arc::clone(&storage)).await?);
164 info!(
165 max_cache_mb = config.max_cache_size / (1024 * 1024),
166 "Cell cache initialized"
167 );
168
169 let heartbeat_writer = Arc::new(HeartbeatWriter::new(
171 Arc::clone(&storage),
172 &config,
173 Arc::clone(&bee_pool),
174 Arc::clone(&cell_cache),
175 ));
176
177 let world_view_builder = Arc::new(WorldViewBuilder::new(
179 Arc::clone(&storage),
180 config.heartbeat_interval, config.dead_threshold,
182 ));
183 let world_view = world_view_builder.world_view();
184
185 heartbeat_writer.write_once().await?;
188 world_view_builder.poll_once().await?;
189 info!("Initial heartbeat written and world view built");
190
191 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
193
194 {
196 let writer = Arc::clone(&heartbeat_writer);
197 let rx = cancel_rx.clone();
198 tokio::spawn(async move {
199 writer.run(rx).await;
200 });
201 }
202
203 {
205 let builder = Arc::clone(&world_view_builder);
206 let rx = cancel_rx.clone();
207 tokio::spawn(async move {
208 builder.run(rx).await;
209 });
210 }
211
212 {
214 let storage = Arc::clone(&storage);
215 let query_ctx = Arc::clone(&query_ctx);
216 let node_id = config.node_id.clone();
217 let rx = cancel_rx.clone();
218 tokio::spawn(async move {
219 run_query_worker_poller(storage, query_ctx, node_id, rx).await;
220 });
221 }
222
223 info!("Heartbeat, world view, and query worker background tasks started");
224
225 Ok(Self {
226 config,
227 storage,
228 registry,
229 query_ctx,
230 bee_pool,
231 cell_cache,
232 thermometer: ColonyThermometer::default(),
233 abandonment_tracker: Arc::new(AbandonmentTracker::default()),
234 heartbeat_writer,
235 world_view,
236 world_view_builder,
237 cancel_tx,
238 })
239 }
240
241 pub async fn shutdown(&self) {
246 info!(node_id = %self.config.node_id, "Apiary node shutting down");
247
248 let _ = self.cancel_tx.send(true);
250
251 tokio::time::sleep(Duration::from_millis(100)).await;
253
254 if let Err(e) = self.heartbeat_writer.delete_heartbeat().await {
256 tracing::warn!(error = %e, "Failed to delete heartbeat during shutdown");
257 } else {
258 info!(node_id = %self.config.node_id, "Heartbeat deleted (graceful departure)");
259 }
260 }
261
262 pub async fn write_to_frame(
270 &self,
271 hive: &str,
272 box_name: &str,
273 frame_name: &str,
274 batch: &RecordBatch,
275 ) -> Result<WriteResult> {
276 let start = std::time::Instant::now();
277
278 let frame = self.registry.get_frame(hive, box_name, frame_name).await?;
280 let schema = FrameSchema::from_json_value(&frame.schema)?;
281 let frame_path = format!("{}/{}/{}", hive, box_name, frame_name);
282
283 let mut ledger = match Ledger::open(Arc::clone(&self.storage), &frame_path).await {
285 Ok(l) => l,
286 Err(_) => {
287 Ledger::create(
288 Arc::clone(&self.storage),
289 &frame_path,
290 schema.clone(),
291 frame.partition_by.clone(),
292 &self.config.node_id,
293 )
294 .await?
295 }
296 };
297
298 let sizing = CellSizingPolicy::new(
300 self.config.target_cell_size,
301 self.config.max_cell_size,
302 self.config.min_cell_size,
303 );
304
305 let writer = CellWriter::new(
306 Arc::clone(&self.storage),
307 frame_path,
308 schema,
309 frame.partition_by.clone(),
310 sizing,
311 );
312
313 let cells = writer.write(batch).await?;
314
315 let cells_written = cells.len();
316 let rows_written: u64 = cells.iter().map(|c| c.rows).sum();
317 let bytes_written: u64 = cells.iter().map(|c| c.bytes).sum();
318
319 let version = ledger
321 .commit(LedgerAction::AddCells { cells }, &self.config.node_id)
322 .await?;
323
324 let duration_ms = start.elapsed().as_millis() as u64;
325 let temperature = self.thermometer.measure(&self.bee_pool).await;
326
327 Ok(WriteResult {
328 version,
329 cells_written,
330 rows_written,
331 bytes_written,
332 duration_ms,
333 temperature,
334 })
335 }
336
337 pub async fn read_from_frame(
340 &self,
341 hive: &str,
342 box_name: &str,
343 frame_name: &str,
344 partition_filter: Option<&HashMap<String, String>>,
345 ) -> Result<Option<RecordBatch>> {
346 let frame_path = format!("{}/{}/{}", hive, box_name, frame_name);
347
348 let ledger = match Ledger::open(Arc::clone(&self.storage), &frame_path).await {
349 Ok(l) => l,
350 Err(ApiaryError::NotFound { .. }) => return Ok(None),
351 Err(e) => return Err(e),
352 };
353
354 let cells = if let Some(filter) = partition_filter {
355 ledger.prune_cells(filter, &HashMap::new())
356 } else {
357 ledger.active_cells().iter().collect()
358 };
359
360 if cells.is_empty() {
361 return Ok(None);
362 }
363
364 let reader = CellReader::new(Arc::clone(&self.storage), frame_path);
365 reader.read_cells_merged(&cells, None).await
366 }
367
368 pub async fn overwrite_frame(
371 &self,
372 hive: &str,
373 box_name: &str,
374 frame_name: &str,
375 batch: &RecordBatch,
376 ) -> Result<WriteResult> {
377 let start = std::time::Instant::now();
378
379 let frame = self.registry.get_frame(hive, box_name, frame_name).await?;
380 let schema = FrameSchema::from_json_value(&frame.schema)?;
381 let frame_path = format!("{}/{}/{}", hive, box_name, frame_name);
382
383 let mut ledger = Ledger::open(Arc::clone(&self.storage), &frame_path).await?;
384
385 let sizing = CellSizingPolicy::new(
386 self.config.target_cell_size,
387 self.config.max_cell_size,
388 self.config.min_cell_size,
389 );
390
391 let writer = CellWriter::new(
392 Arc::clone(&self.storage),
393 frame_path,
394 schema,
395 frame.partition_by.clone(),
396 sizing,
397 );
398
399 let new_cells = writer.write(batch).await?;
400
401 let cells_written = new_cells.len();
402 let rows_written: u64 = new_cells.iter().map(|c| c.rows).sum();
403 let bytes_written: u64 = new_cells.iter().map(|c| c.bytes).sum();
404
405 let removed: Vec<_> = ledger.active_cells().iter().map(|c| c.id.clone()).collect();
407
408 let version = ledger
409 .commit(
410 LedgerAction::RewriteCells {
411 removed,
412 added: new_cells,
413 },
414 &self.config.node_id,
415 )
416 .await?;
417
418 let duration_ms = start.elapsed().as_millis() as u64;
419 let temperature = self.thermometer.measure(&self.bee_pool).await;
420
421 Ok(WriteResult {
422 version,
423 cells_written,
424 rows_written,
425 bytes_written,
426 duration_ms,
427 temperature,
428 })
429 }
430
431 pub async fn init_frame_ledger(
433 &self,
434 hive: &str,
435 box_name: &str,
436 frame_name: &str,
437 ) -> Result<()> {
438 let frame = self.registry.get_frame(hive, box_name, frame_name).await?;
439 let schema = FrameSchema::from_json_value(&frame.schema)?;
440 let frame_path = format!("{}/{}/{}", hive, box_name, frame_name);
441
442 Ledger::create(
443 Arc::clone(&self.storage),
444 &frame_path,
445 schema,
446 frame.partition_by.clone(),
447 &self.config.node_id,
448 )
449 .await?;
450
451 Ok(())
452 }
453
454 pub async fn sql(&self, query: &str) -> Result<Vec<RecordBatch>> {
466 let query_ctx = Arc::clone(&self.query_ctx);
467 let query_owned = query.to_string();
468 let rt_handle = tokio::runtime::Handle::current();
469
470 let handle = self
471 .bee_pool
472 .submit(move || {
473 rt_handle.block_on(async {
474 let mut ctx = query_ctx.lock().await;
475 ctx.sql(&query_owned).await
476 })
477 })
478 .await;
479
480 handle.await.map_err(|e| ApiaryError::Internal {
481 message: format!("Task join error: {e}"),
482 })?
483 }
484
485 pub async fn bee_status(&self) -> Vec<BeeStatus> {
487 self.bee_pool.status().await
488 }
489
490 pub async fn world_view(&self) -> WorldView {
492 self.world_view.read().await.clone()
493 }
494
495 pub async fn swarm_status(&self) -> SwarmStatus {
497 let view = self.world_view.read().await;
498 let mut nodes = Vec::new();
499
500 for status in view.nodes.values() {
501 nodes.push(SwarmNodeInfo {
502 node_id: status.node_id.as_str().to_string(),
503 state: match status.state {
504 NodeState::Alive => "alive".to_string(),
505 NodeState::Suspect => "suspect".to_string(),
506 NodeState::Dead => "dead".to_string(),
507 },
508 bees: status.heartbeat.load.bees_total,
509 idle_bees: status.heartbeat.load.bees_idle,
510 memory_pressure: status.heartbeat.load.memory_pressure,
511 colony_temperature: status.heartbeat.load.colony_temperature,
512 });
513 }
514
515 nodes.sort_by(|a, b| a.node_id.cmp(&b.node_id));
517
518 let total_bees: usize = nodes.iter().map(|n| n.bees).sum();
519 let total_idle_bees: usize = nodes.iter().map(|n| n.idle_bees).sum();
520
521 SwarmStatus {
522 nodes,
523 total_bees,
524 total_idle_bees,
525 }
526 }
527
528 pub async fn colony_status(&self) -> ColonyStatus {
530 let temperature = self.thermometer.measure(&self.bee_pool).await;
531 let regulation = self.thermometer.regulation(temperature);
532
533 ColonyStatus {
534 temperature,
535 regulation: regulation.as_str().to_string(),
536 setpoint: self.thermometer.setpoint(),
537 }
538 }
539
540 #[allow(dead_code)] pub async fn sql_distributed(&self, query: &str) -> Result<Vec<RecordBatch>> {
547 self.sql(query).await
550 }
551}
552
553#[derive(Debug, Clone)]
555pub struct SwarmStatus {
556 pub nodes: Vec<SwarmNodeInfo>,
558 pub total_bees: usize,
560 pub total_idle_bees: usize,
562}
563
564#[derive(Debug, Clone)]
566pub struct SwarmNodeInfo {
567 pub node_id: String,
568 pub state: String,
569 pub bees: usize,
570 pub idle_bees: usize,
571 pub memory_pressure: f64,
572 pub colony_temperature: f64,
573}
574
575#[derive(Debug, Clone)]
577pub struct ColonyStatus {
578 pub temperature: f64,
580 pub regulation: String,
582 pub setpoint: f64,
584}
585
586fn home_dir() -> Option<std::path::PathBuf> {
588 #[cfg(target_os = "windows")]
589 {
590 std::env::var("USERPROFILE")
591 .ok()
592 .map(std::path::PathBuf::from)
593 }
594 #[cfg(not(target_os = "windows"))]
595 {
596 std::env::var("HOME").ok().map(std::path::PathBuf::from)
597 }
598}
599
600async fn run_query_worker_poller(
602 storage: Arc<dyn StorageBackend>,
603 query_ctx: Arc<tokio::sync::Mutex<ApiaryQueryContext>>,
604 node_id: apiary_core::types::NodeId,
605 cancel: tokio::sync::watch::Receiver<bool>,
606) {
607 use apiary_query::distributed;
608
609 info!(node_id = %node_id, "Query worker poller started");
610
611 let poll_interval = Duration::from_millis(500);
612
613 loop {
614 tokio::select! {
615 _ = tokio::time::sleep(poll_interval) => {
616 match storage.list("_queries/").await {
618 Ok(keys) => {
619 for key in keys {
621 if !key.ends_with("/manifest.json") {
622 continue;
623 }
624
625 let parts: Vec<&str> = key.split('/').collect();
627 if parts.len() < 3 {
628 continue;
629 }
630 let query_id = parts[1];
631
632 match distributed::read_manifest(&storage, query_id).await {
634 Ok(manifest) => {
635 let my_tasks: Vec<_> = manifest.tasks.iter()
637 .filter(|t| t.node_id == node_id)
638 .collect();
639
640 if my_tasks.is_empty() {
641 continue;
642 }
643
644 let partial_path = distributed::partial_result_path(query_id, &node_id);
646 if storage.get(&partial_path).await.is_ok() {
647 continue;
649 }
650
651 info!(
653 query_id = %query_id,
654 tasks = my_tasks.len(),
655 "Executing distributed query tasks"
656 );
657
658 let mut results = Vec::new();
659 let ctx = query_ctx.lock().await;
660
661 for task in my_tasks {
662 match ctx.execute_task(&task.sql_fragment, &task.cells).await {
663 Ok(batches) => {
664 results.extend(batches);
665 }
666 Err(e) => {
667 tracing::warn!(
668 task_id = %task.task_id,
669 error = %e,
670 "Task execution failed"
671 );
672 }
673 }
674 }
675
676 if !results.is_empty() {
678 if let Err(e) = distributed::write_partial_result(
679 &storage,
680 query_id,
681 &node_id,
682 &results,
683 ).await {
684 tracing::warn!(
685 query_id = %query_id,
686 error = %e,
687 "Failed to write partial result"
688 );
689 }
690 }
691 }
692 Err(_) => {
693 continue;
695 }
696 }
697 }
698 }
699 Err(e) => {
700 tracing::warn!(error = %e, "Failed to list query manifests");
701 }
702 }
703 }
704 _ = wait_for_cancel(&cancel) => {
705 tracing::debug!(node_id = %node_id, "Query worker poller stopping");
706 break;
707 }
708 }
709 }
710}
711
712async fn wait_for_cancel(cancel: &tokio::sync::watch::Receiver<bool>) {
714 let mut rx = cancel.clone();
715 let _ = rx.wait_for(|&v| v).await;
716}
717
718#[cfg(test)]
719mod tests {
720 use super::*;
721
722 #[tokio::test]
723 async fn test_start_local_node() {
724 let tmp = tempfile::TempDir::new().unwrap();
725 let mut config = NodeConfig::detect("local://test");
726 config.storage_uri = format!("local://{}", tmp.path().display());
727 let node = ApiaryNode::start(config).await.unwrap();
728 assert!(node.config.cores > 0);
729 node.shutdown().await;
730 }
731
732 #[tokio::test]
733 async fn test_start_with_raw_path() {
734 let tmp = tempfile::TempDir::new().unwrap();
735 let mut config = NodeConfig::detect("test");
736 config.storage_uri = tmp.path().to_string_lossy().to_string();
737 let node = ApiaryNode::start(config).await.unwrap();
738 node.shutdown().await;
739 }
740}