Skip to main content

apiary_runtime/
node.rs

1//! The Apiary node — a stateless compute instance in the swarm.
2//!
3//! [`ApiaryNode`] is the main runtime entry point. It initialises the
4//! storage backend, detects system capacity, creates the bee pool,
5//! starts the heartbeat writer and world view builder, and manages
6//! the node lifecycle.
7
8use std::collections::HashMap;
9use std::sync::Arc;
10use std::time::Duration;
11
12use arrow::record_batch::RecordBatch;
13use tokio::sync::RwLock;
14use tracing::info;
15
16use apiary_core::config::NodeConfig;
17use apiary_core::error::ApiaryError;
18use apiary_core::registry_manager::RegistryManager;
19use apiary_core::storage::StorageBackend;
20use apiary_core::{CellSizingPolicy, FrameSchema, LedgerAction, Result, WriteResult};
21use apiary_query::ApiaryQueryContext;
22use apiary_storage::cell_reader::CellReader;
23use apiary_storage::cell_writer::CellWriter;
24use apiary_storage::ledger::Ledger;
25use apiary_storage::local::LocalBackend;
26use apiary_storage::s3::S3Backend;
27
28use crate::bee::{BeePool, BeeStatus};
29use crate::behavioral::{AbandonmentTracker, ColonyThermometer};
30use crate::cache::CellCache;
31use crate::heartbeat::{HeartbeatWriter, NodeState, WorldView, WorldViewBuilder};
32
33/// An Apiary compute node — the runtime for one machine in the swarm.
34///
35/// The node holds a reference to the storage backend and its configuration.
36/// In solo mode it uses [`LocalBackend`]; in multi-node mode it uses
37/// [`S3Backend`]. The node is otherwise stateless — all committed state
38/// lives in object storage.
39pub struct ApiaryNode {
40    /// Node configuration including auto-detected capacity.
41    pub config: NodeConfig,
42
43    /// The shared storage backend (object storage or local filesystem).
44    pub storage: Arc<dyn StorageBackend>,
45
46    /// Registry manager for DDL operations.
47    pub registry: Arc<RegistryManager>,
48
49    /// DataFusion-based SQL query context.
50    pub query_ctx: Arc<tokio::sync::Mutex<ApiaryQueryContext>>,
51
52    /// Pool of bees for isolated task execution.
53    pub bee_pool: Arc<BeePool>,
54
55    /// Local cell cache with LRU eviction.
56    pub cell_cache: Arc<CellCache>,
57
58    /// Colony thermometer for measuring system health.
59    pub thermometer: ColonyThermometer,
60
61    /// Abandonment tracker for task failure handling.
62    pub abandonment_tracker: Arc<AbandonmentTracker>,
63
64    /// Heartbeat writer for this node.
65    heartbeat_writer: Arc<HeartbeatWriter>,
66
67    /// Shared world view (updated by the background builder).
68    world_view: Arc<RwLock<WorldView>>,
69
70    /// World view builder (kept alive for on-demand cleanup).
71    #[allow(dead_code)]
72    world_view_builder: Arc<WorldViewBuilder>,
73
74    /// Cancellation channel to stop background tasks on shutdown.
75    cancel_tx: tokio::sync::watch::Sender<bool>,
76}
77
78impl ApiaryNode {
79    /// Start a new Apiary node with the given configuration.
80    ///
81    /// Initialises the appropriate storage backend based on `config.storage_uri`
82    /// and logs the node's capacity.
83    pub async fn start(config: NodeConfig) -> Result<Self> {
84        let storage: Arc<dyn StorageBackend> = if config.storage_uri.starts_with("s3://") {
85            Arc::new(S3Backend::new(&config.storage_uri)?)
86        } else {
87            // Parse local URI: "local://<path>" or treat as raw path
88            let path = config
89                .storage_uri
90                .strip_prefix("local://")
91                .unwrap_or(&config.storage_uri);
92
93            // Expand ~ to home directory
94            let expanded = if path.starts_with("~/") || path.starts_with("~\\") {
95                let home = home_dir().ok_or_else(|| ApiaryError::Config {
96                    message: "Cannot determine home directory".to_string(),
97                })?;
98                home.join(&path[2..])
99            } else {
100                std::path::PathBuf::from(path)
101            };
102
103            Arc::new(LocalBackend::new(expanded).await?)
104        };
105
106        info!(
107            node_id = %config.node_id,
108            cores = config.cores,
109            memory_mb = config.memory_bytes / (1024 * 1024),
110            memory_per_bee_mb = config.memory_per_bee / (1024 * 1024),
111            target_cell_size_mb = config.target_cell_size / (1024 * 1024),
112            storage_uri = %config.storage_uri,
113            "Apiary node started"
114        );
115
116        // Initialize registry (retry with backoff for transient S3 errors)
117        let registry = Arc::new(RegistryManager::new(Arc::clone(&storage)));
118        {
119            let max_retries: u32 = 10;
120            let mut delay = Duration::from_secs(1);
121            let mut last_err = None;
122            for attempt in 1..=max_retries {
123                match registry.load_or_create().await {
124                    Ok(_) => {
125                        last_err = None;
126                        break;
127                    }
128                    Err(e) => {
129                        tracing::warn!(
130                            attempt,
131                            max_retries,
132                            error = %e,
133                            "Registry initialization failed, retrying"
134                        );
135                        last_err = Some(e);
136                        if attempt < max_retries {
137                            tokio::time::sleep(delay).await;
138                            delay = (delay * 2).min(Duration::from_secs(10));
139                        }
140                    }
141                }
142            }
143            if let Some(e) = last_err {
144                return Err(e);
145            }
146        }
147        info!("Registry loaded");
148
149        // Initialize query context
150        let query_ctx = Arc::new(tokio::sync::Mutex::new(ApiaryQueryContext::with_node_id(
151            Arc::clone(&storage),
152            Arc::clone(&registry),
153            config.node_id.clone(),
154        )));
155
156        // Initialize bee pool
157        let bee_pool = Arc::new(BeePool::new(&config));
158        info!(bees = config.cores, "Bee pool initialized");
159
160        // Initialize cell cache
161        let cache_dir = config.cache_dir.join("cells");
162        let cell_cache =
163            Arc::new(CellCache::new(cache_dir, config.max_cache_size, Arc::clone(&storage)).await?);
164        info!(
165            max_cache_mb = config.max_cache_size / (1024 * 1024),
166            "Cell cache initialized"
167        );
168
169        // Initialize heartbeat writer
170        let heartbeat_writer = Arc::new(HeartbeatWriter::new(
171            Arc::clone(&storage),
172            &config,
173            Arc::clone(&bee_pool),
174            Arc::clone(&cell_cache),
175        ));
176
177        // Initialize world view builder
178        let world_view_builder = Arc::new(WorldViewBuilder::new(
179            Arc::clone(&storage),
180            config.heartbeat_interval, // poll at same rate as heartbeat
181            config.dead_threshold,
182        ));
183        let world_view = world_view_builder.world_view();
184
185        // Write initial heartbeat and build initial world view synchronously
186        // so that swarm_status() works immediately after start().
187        heartbeat_writer.write_once().await?;
188        world_view_builder.poll_once().await?;
189        info!("Initial heartbeat written and world view built");
190
191        // Create cancellation channel
192        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
193
194        // Start heartbeat writer background task
195        {
196            let writer = Arc::clone(&heartbeat_writer);
197            let rx = cancel_rx.clone();
198            tokio::spawn(async move {
199                writer.run(rx).await;
200            });
201        }
202
203        // Start world view builder background task
204        {
205            let builder = Arc::clone(&world_view_builder);
206            let rx = cancel_rx.clone();
207            tokio::spawn(async move {
208                builder.run(rx).await;
209            });
210        }
211
212        // Start query worker task poller (for distributed execution)
213        {
214            let storage = Arc::clone(&storage);
215            let query_ctx = Arc::clone(&query_ctx);
216            let node_id = config.node_id.clone();
217            let rx = cancel_rx.clone();
218            tokio::spawn(async move {
219                run_query_worker_poller(storage, query_ctx, node_id, rx).await;
220            });
221        }
222
223        info!("Heartbeat, world view, and query worker background tasks started");
224
225        Ok(Self {
226            config,
227            storage,
228            registry,
229            query_ctx,
230            bee_pool,
231            cell_cache,
232            thermometer: ColonyThermometer::default(),
233            abandonment_tracker: Arc::new(AbandonmentTracker::default()),
234            heartbeat_writer,
235            world_view,
236            world_view_builder,
237            cancel_tx,
238        })
239    }
240
241    /// Gracefully shut down the node.
242    ///
243    /// Stops background tasks (heartbeat writer, world view builder),
244    /// deletes the heartbeat file, and cleans up resources.
245    pub async fn shutdown(&self) {
246        info!(node_id = %self.config.node_id, "Apiary node shutting down");
247
248        // Signal background tasks to stop
249        let _ = self.cancel_tx.send(true);
250
251        // Allow background tasks a moment to stop
252        tokio::time::sleep(Duration::from_millis(100)).await;
253
254        // Delete our heartbeat file (graceful departure)
255        if let Err(e) = self.heartbeat_writer.delete_heartbeat().await {
256            tracing::warn!(error = %e, "Failed to delete heartbeat during shutdown");
257        } else {
258            info!(node_id = %self.config.node_id, "Heartbeat deleted (graceful departure)");
259        }
260    }
261
262    /// Write data to a frame. This is the end-to-end write path:
263    /// 1. Resolve frame from registry
264    /// 2. Open/create ledger
265    /// 3. Validate schema
266    /// 4. Partition data
267    /// 5. Write cells to storage
268    /// 6. Commit ledger entry
269    pub async fn write_to_frame(
270        &self,
271        hive: &str,
272        box_name: &str,
273        frame_name: &str,
274        batch: &RecordBatch,
275    ) -> Result<WriteResult> {
276        let start = std::time::Instant::now();
277
278        // Resolve frame metadata
279        let frame = self.registry.get_frame(hive, box_name, frame_name).await?;
280        let schema = FrameSchema::from_json_value(&frame.schema)?;
281        let frame_path = format!("{}/{}/{}", hive, box_name, frame_name);
282
283        // Open or create ledger
284        let mut ledger = match Ledger::open(Arc::clone(&self.storage), &frame_path).await {
285            Ok(l) => l,
286            Err(_) => {
287                Ledger::create(
288                    Arc::clone(&self.storage),
289                    &frame_path,
290                    schema.clone(),
291                    frame.partition_by.clone(),
292                    &self.config.node_id,
293                )
294                .await?
295            }
296        };
297
298        // Write cells
299        let sizing = CellSizingPolicy::new(
300            self.config.target_cell_size,
301            self.config.max_cell_size,
302            self.config.min_cell_size,
303        );
304
305        let writer = CellWriter::new(
306            Arc::clone(&self.storage),
307            frame_path,
308            schema,
309            frame.partition_by.clone(),
310            sizing,
311        );
312
313        let cells = writer.write(batch).await?;
314
315        let cells_written = cells.len();
316        let rows_written: u64 = cells.iter().map(|c| c.rows).sum();
317        let bytes_written: u64 = cells.iter().map(|c| c.bytes).sum();
318
319        // Commit to ledger
320        let version = ledger
321            .commit(LedgerAction::AddCells { cells }, &self.config.node_id)
322            .await?;
323
324        let duration_ms = start.elapsed().as_millis() as u64;
325        let temperature = self.thermometer.measure(&self.bee_pool).await;
326
327        Ok(WriteResult {
328            version,
329            cells_written,
330            rows_written,
331            bytes_written,
332            duration_ms,
333            temperature,
334        })
335    }
336
337    /// Read data from a frame, optionally filtering by partition values.
338    /// Returns all matching data as a merged RecordBatch.
339    pub async fn read_from_frame(
340        &self,
341        hive: &str,
342        box_name: &str,
343        frame_name: &str,
344        partition_filter: Option<&HashMap<String, String>>,
345    ) -> Result<Option<RecordBatch>> {
346        let frame_path = format!("{}/{}/{}", hive, box_name, frame_name);
347
348        let ledger = match Ledger::open(Arc::clone(&self.storage), &frame_path).await {
349            Ok(l) => l,
350            Err(ApiaryError::NotFound { .. }) => return Ok(None),
351            Err(e) => return Err(e),
352        };
353
354        let cells = if let Some(filter) = partition_filter {
355            ledger.prune_cells(filter, &HashMap::new())
356        } else {
357            ledger.active_cells().iter().collect()
358        };
359
360        if cells.is_empty() {
361            return Ok(None);
362        }
363
364        let reader = CellReader::new(Arc::clone(&self.storage), frame_path);
365        reader.read_cells_merged(&cells, None).await
366    }
367
368    /// Overwrite all data in a frame with new data.
369    /// Commits a RewriteCells entry removing all existing cells and adding new ones.
370    pub async fn overwrite_frame(
371        &self,
372        hive: &str,
373        box_name: &str,
374        frame_name: &str,
375        batch: &RecordBatch,
376    ) -> Result<WriteResult> {
377        let start = std::time::Instant::now();
378
379        let frame = self.registry.get_frame(hive, box_name, frame_name).await?;
380        let schema = FrameSchema::from_json_value(&frame.schema)?;
381        let frame_path = format!("{}/{}/{}", hive, box_name, frame_name);
382
383        let mut ledger = Ledger::open(Arc::clone(&self.storage), &frame_path).await?;
384
385        let sizing = CellSizingPolicy::new(
386            self.config.target_cell_size,
387            self.config.max_cell_size,
388            self.config.min_cell_size,
389        );
390
391        let writer = CellWriter::new(
392            Arc::clone(&self.storage),
393            frame_path,
394            schema,
395            frame.partition_by.clone(),
396            sizing,
397        );
398
399        let new_cells = writer.write(batch).await?;
400
401        let cells_written = new_cells.len();
402        let rows_written: u64 = new_cells.iter().map(|c| c.rows).sum();
403        let bytes_written: u64 = new_cells.iter().map(|c| c.bytes).sum();
404
405        // Remove all old cells, add new ones
406        let removed: Vec<_> = ledger.active_cells().iter().map(|c| c.id.clone()).collect();
407
408        let version = ledger
409            .commit(
410                LedgerAction::RewriteCells {
411                    removed,
412                    added: new_cells,
413                },
414                &self.config.node_id,
415            )
416            .await?;
417
418        let duration_ms = start.elapsed().as_millis() as u64;
419        let temperature = self.thermometer.measure(&self.bee_pool).await;
420
421        Ok(WriteResult {
422            version,
423            cells_written,
424            rows_written,
425            bytes_written,
426            duration_ms,
427            temperature,
428        })
429    }
430
431    /// Initialize the ledger for a frame (called after create_frame in registry).
432    pub async fn init_frame_ledger(
433        &self,
434        hive: &str,
435        box_name: &str,
436        frame_name: &str,
437    ) -> Result<()> {
438        let frame = self.registry.get_frame(hive, box_name, frame_name).await?;
439        let schema = FrameSchema::from_json_value(&frame.schema)?;
440        let frame_path = format!("{}/{}/{}", hive, box_name, frame_name);
441
442        Ledger::create(
443            Arc::clone(&self.storage),
444            &frame_path,
445            schema,
446            frame.partition_by.clone(),
447            &self.config.node_id,
448        )
449        .await?;
450
451        Ok(())
452    }
453
454    /// Execute a SQL query and return results as RecordBatches.
455    ///
456    /// The query is executed through the BeePool — assigned to an idle bee
457    /// or queued if all bees are busy. Each bee runs in its own sealed
458    /// chamber with memory budget and timeout enforcement.
459    ///
460    /// Supports:
461    /// - Standard SQL (SELECT, GROUP BY, ORDER BY, etc.) over frames
462    /// - Custom commands: USE HIVE, USE BOX, SHOW HIVES, SHOW BOXES, SHOW FRAMES, DESCRIBE
463    /// - 3-part table names: hive.box.frame
464    /// - 1-part names after USE HIVE / USE BOX
465    pub async fn sql(&self, query: &str) -> Result<Vec<RecordBatch>> {
466        let query_ctx = Arc::clone(&self.query_ctx);
467        let query_owned = query.to_string();
468        let rt_handle = tokio::runtime::Handle::current();
469
470        let handle = self
471            .bee_pool
472            .submit(move || {
473                rt_handle.block_on(async {
474                    let mut ctx = query_ctx.lock().await;
475                    ctx.sql(&query_owned).await
476                })
477            })
478            .await;
479
480        handle.await.map_err(|e| ApiaryError::Internal {
481            message: format!("Task join error: {e}"),
482        })?
483    }
484
485    /// Return the status of each bee in the pool.
486    pub async fn bee_status(&self) -> Vec<BeeStatus> {
487        self.bee_pool.status().await
488    }
489
490    /// Return the current world view snapshot.
491    pub async fn world_view(&self) -> WorldView {
492        self.world_view.read().await.clone()
493    }
494
495    /// Return swarm status: a summary of all nodes visible to this node.
496    pub async fn swarm_status(&self) -> SwarmStatus {
497        let view = self.world_view.read().await;
498        let mut nodes = Vec::new();
499
500        for status in view.nodes.values() {
501            nodes.push(SwarmNodeInfo {
502                node_id: status.node_id.as_str().to_string(),
503                state: match status.state {
504                    NodeState::Alive => "alive".to_string(),
505                    NodeState::Suspect => "suspect".to_string(),
506                    NodeState::Dead => "dead".to_string(),
507                },
508                bees: status.heartbeat.load.bees_total,
509                idle_bees: status.heartbeat.load.bees_idle,
510                memory_pressure: status.heartbeat.load.memory_pressure,
511                colony_temperature: status.heartbeat.load.colony_temperature,
512            });
513        }
514
515        // Sort by node_id for deterministic output
516        nodes.sort_by(|a, b| a.node_id.cmp(&b.node_id));
517
518        let total_bees: usize = nodes.iter().map(|n| n.bees).sum();
519        let total_idle_bees: usize = nodes.iter().map(|n| n.idle_bees).sum();
520
521        SwarmStatus {
522            nodes,
523            total_bees,
524            total_idle_bees,
525        }
526    }
527
528    /// Return the current colony status: temperature and regulation state.
529    pub async fn colony_status(&self) -> ColonyStatus {
530        let temperature = self.thermometer.measure(&self.bee_pool).await;
531        let regulation = self.thermometer.regulation(temperature);
532
533        ColonyStatus {
534            temperature,
535            regulation: regulation.as_str().to_string(),
536            setpoint: self.thermometer.setpoint(),
537        }
538    }
539
540    /// Execute a distributed query (v2 feature - explicit control).
541    ///
542    /// This method is reserved for v2 when users want explicit control over
543    /// distributed execution strategy. In v1, distributed execution happens
544    /// transparently within the query context based on query planning.
545    #[allow(dead_code)] // Reserved for v2 explicit distribution control
546    pub async fn sql_distributed(&self, query: &str) -> Result<Vec<RecordBatch>> {
547        // v1: Distributed execution is transparent in query context
548        // v2: This will allow explicit control over distribution strategy
549        self.sql(query).await
550    }
551}
552
553/// Summary of the swarm as seen by this node.
554#[derive(Debug, Clone)]
555pub struct SwarmStatus {
556    /// Info for each known node.
557    pub nodes: Vec<SwarmNodeInfo>,
558    /// Total bees across all nodes.
559    pub total_bees: usize,
560    /// Total idle bees across all nodes.
561    pub total_idle_bees: usize,
562}
563
564/// Info about a single node in the swarm.
565#[derive(Debug, Clone)]
566pub struct SwarmNodeInfo {
567    pub node_id: String,
568    pub state: String,
569    pub bees: usize,
570    pub idle_bees: usize,
571    pub memory_pressure: f64,
572    pub colony_temperature: f64,
573}
574
575/// Colony temperature and regulation status.
576#[derive(Debug, Clone)]
577pub struct ColonyStatus {
578    /// Current colony temperature (0.0 to 1.0).
579    pub temperature: f64,
580    /// Regulation state: "cold", "ideal", "warm", "hot", or "critical".
581    pub regulation: String,
582    /// Temperature setpoint.
583    pub setpoint: f64,
584}
585
586/// Best-effort home directory detection.
587fn home_dir() -> Option<std::path::PathBuf> {
588    #[cfg(target_os = "windows")]
589    {
590        std::env::var("USERPROFILE")
591            .ok()
592            .map(std::path::PathBuf::from)
593    }
594    #[cfg(not(target_os = "windows"))]
595    {
596        std::env::var("HOME").ok().map(std::path::PathBuf::from)
597    }
598}
599
600/// Background task that polls for distributed query manifests and executes assigned tasks.
601async fn run_query_worker_poller(
602    storage: Arc<dyn StorageBackend>,
603    query_ctx: Arc<tokio::sync::Mutex<ApiaryQueryContext>>,
604    node_id: apiary_core::types::NodeId,
605    cancel: tokio::sync::watch::Receiver<bool>,
606) {
607    use apiary_query::distributed;
608
609    info!(node_id = %node_id, "Query worker poller started");
610
611    let poll_interval = Duration::from_millis(500);
612
613    loop {
614        tokio::select! {
615            _ = tokio::time::sleep(poll_interval) => {
616                // List query manifests
617                match storage.list("_queries/").await {
618                    Ok(keys) => {
619                        // Find manifest files
620                        for key in keys {
621                            if !key.ends_with("/manifest.json") {
622                                continue;
623                            }
624
625                            // Extract query_id from path: _queries/{query_id}/manifest.json
626                            let parts: Vec<&str> = key.split('/').collect();
627                            if parts.len() < 3 {
628                                continue;
629                            }
630                            let query_id = parts[1];
631
632                            // Try to read the manifest
633                            match distributed::read_manifest(&storage, query_id).await {
634                                Ok(manifest) => {
635                                    // Check if any tasks are assigned to this node
636                                    let my_tasks: Vec<_> = manifest.tasks.iter()
637                                        .filter(|t| t.node_id == node_id)
638                                        .collect();
639
640                                    if my_tasks.is_empty() {
641                                        continue;
642                                    }
643
644                                    // Check if we've already written our partial result
645                                    let partial_path = distributed::partial_result_path(query_id, &node_id);
646                                    if storage.get(&partial_path).await.is_ok() {
647                                        // Already completed
648                                        continue;
649                                    }
650
651                                    // Execute tasks and write partial result
652                                    info!(
653                                        query_id = %query_id,
654                                        tasks = my_tasks.len(),
655                                        "Executing distributed query tasks"
656                                    );
657
658                                    let mut results = Vec::new();
659                                    let ctx = query_ctx.lock().await;
660
661                                    for task in my_tasks {
662                                        match ctx.execute_task(&task.sql_fragment, &task.cells).await {
663                                            Ok(batches) => {
664                                                results.extend(batches);
665                                            }
666                                            Err(e) => {
667                                                tracing::warn!(
668                                                    task_id = %task.task_id,
669                                                    error = %e,
670                                                    "Task execution failed"
671                                                );
672                                            }
673                                        }
674                                    }
675
676                                    // Write partial result
677                                    if !results.is_empty() {
678                                        if let Err(e) = distributed::write_partial_result(
679                                            &storage,
680                                            query_id,
681                                            &node_id,
682                                            &results,
683                                        ).await {
684                                            tracing::warn!(
685                                                query_id = %query_id,
686                                                error = %e,
687                                                "Failed to write partial result"
688                                            );
689                                        }
690                                    }
691                                }
692                                Err(_) => {
693                                    // Manifest not readable yet or deleted
694                                    continue;
695                                }
696                            }
697                        }
698                    }
699                    Err(e) => {
700                        tracing::warn!(error = %e, "Failed to list query manifests");
701                    }
702                }
703            }
704            _ = wait_for_cancel(&cancel) => {
705                tracing::debug!(node_id = %node_id, "Query worker poller stopping");
706                break;
707            }
708        }
709    }
710}
711
712/// Helper to wait for cancellation.
713async fn wait_for_cancel(cancel: &tokio::sync::watch::Receiver<bool>) {
714    let mut rx = cancel.clone();
715    let _ = rx.wait_for(|&v| v).await;
716}
717
718#[cfg(test)]
719mod tests {
720    use super::*;
721
722    #[tokio::test]
723    async fn test_start_local_node() {
724        let tmp = tempfile::TempDir::new().unwrap();
725        let mut config = NodeConfig::detect("local://test");
726        config.storage_uri = format!("local://{}", tmp.path().display());
727        let node = ApiaryNode::start(config).await.unwrap();
728        assert!(node.config.cores > 0);
729        node.shutdown().await;
730    }
731
732    #[tokio::test]
733    async fn test_start_with_raw_path() {
734        let tmp = tempfile::TempDir::new().unwrap();
735        let mut config = NodeConfig::detect("test");
736        config.storage_uri = tmp.path().to_string_lossy().to_string();
737        let node = ApiaryNode::start(config).await.unwrap();
738        node.shutdown().await;
739    }
740}