Skip to main content

hashtree_cli/
daemon.rs

1use anyhow::{Context, Result};
2use axum::Router;
3use nostr::nips::nip19::ToBech32;
4use nostr::Keys;
5use std::collections::HashSet;
6use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
7use std::path::PathBuf;
8use std::sync::Arc;
9use tokio::net::TcpListener;
10use tokio::sync::Mutex;
11use tokio::task::JoinHandle;
12use tower_http::cors::CorsLayer;
13
14use crate::config::{ensure_keys, parse_npub, pubkey_bytes, Config};
15use crate::eviction::{spawn_background_eviction_task, BACKGROUND_EVICTION_INTERVAL};
16use crate::nostr_relay::{NostrRelay, NostrRelayConfig};
17use crate::server::{AppState, HashtreeServer};
18use crate::socialgraph;
19use crate::storage::HashtreeStore;
20
21#[cfg(feature = "p2p")]
22use crate::webrtc::{ContentStore, PeerClassifier, PeerRouter, WebRTCState};
23#[cfg(not(feature = "p2p"))]
24use crate::WebRTCState;
25
26#[cfg(feature = "p2p")]
27struct PeerRouterRuntime {
28    shutdown: Arc<tokio::sync::watch::Sender<bool>>,
29    join: JoinHandle<()>,
30}
31
32struct BackgroundSyncRuntime {
33    service: Arc<crate::sync::BackgroundSync>,
34    join: Option<JoinHandle<()>>,
35}
36
37impl Drop for BackgroundSyncRuntime {
38    fn drop(&mut self) {
39        self.service.shutdown();
40        if let Some(join) = self.join.take() {
41            join.abort();
42        }
43    }
44}
45
46struct BackgroundMirrorRuntime {
47    service: Arc<crate::nostr_mirror::BackgroundNostrMirror>,
48    join: Option<JoinHandle<()>>,
49}
50
51impl Drop for BackgroundMirrorRuntime {
52    fn drop(&mut self) {
53        self.service.shutdown();
54        if let Some(join) = self.join.take() {
55            join.abort();
56        }
57    }
58}
59
60struct BackgroundServicesRuntime {
61    crawler: Option<socialgraph::crawler::SocialGraphTaskHandles>,
62    mirror: Option<BackgroundMirrorRuntime>,
63    sync: Option<BackgroundSyncRuntime>,
64}
65
66impl Drop for BackgroundServicesRuntime {
67    fn drop(&mut self) {
68        if let Some(handles) = self.crawler.as_ref() {
69            let _ = handles.shutdown_tx.send(true);
70        }
71        if let Some(runtime) = self.mirror.as_ref() {
72            runtime.service.shutdown();
73        }
74        if let Some(runtime) = self.sync.as_ref() {
75            runtime.service.shutdown();
76        }
77    }
78}
79
80impl BackgroundServicesRuntime {
81    fn status(&self) -> EmbeddedBackgroundServicesStatus {
82        EmbeddedBackgroundServicesStatus {
83            crawler_active: self.crawler.is_some(),
84            mirror_active: self.mirror.is_some(),
85            sync_active: self.sync.is_some(),
86        }
87    }
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub struct EmbeddedBackgroundServicesStatus {
92    pub crawler_active: bool,
93    pub mirror_active: bool,
94    pub sync_active: bool,
95}
96
97pub struct EmbeddedBackgroundServicesController {
98    keys: Keys,
99    data_dir: PathBuf,
100    store: Arc<HashtreeStore>,
101    graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
102    graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
103    spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
104    webrtc_state: Option<Arc<WebRTCState>>,
105    runtime: Mutex<BackgroundServicesRuntime>,
106}
107
108impl EmbeddedBackgroundServicesController {
109    fn local_publish_relay(bind_address: &str) -> Option<String> {
110        let addr: SocketAddr = bind_address.parse().ok()?;
111        if addr.port() == 0 {
112            return None;
113        }
114
115        let host = match addr.ip() {
116            IpAddr::V4(ip) if ip.is_unspecified() => Ipv4Addr::LOCALHOST.to_string(),
117            IpAddr::V6(ip) if ip.is_unspecified() => format!("[{}]", Ipv6Addr::LOCALHOST),
118            IpAddr::V4(ip) => ip.to_string(),
119            IpAddr::V6(ip) => format!("[{ip}]"),
120        };
121        Some(format!("ws://{host}:{}/ws", addr.port()))
122    }
123
124    pub fn new(
125        keys: Keys,
126        data_dir: PathBuf,
127        store: Arc<HashtreeStore>,
128        graph_store_concrete: Arc<socialgraph::SocialGraphStore>,
129        graph_store: Arc<dyn socialgraph::SocialGraphBackend>,
130        spambox: Option<Arc<dyn socialgraph::SocialGraphBackend>>,
131        webrtc_state: Option<Arc<WebRTCState>>,
132    ) -> Self {
133        Self {
134            keys,
135            data_dir,
136            store,
137            graph_store_concrete,
138            graph_store,
139            spambox,
140            webrtc_state,
141            runtime: Mutex::new(BackgroundServicesRuntime {
142                crawler: None,
143                mirror: None,
144                sync: None,
145            }),
146        }
147    }
148
149    pub async fn status(&self) -> EmbeddedBackgroundServicesStatus {
150        self.runtime.lock().await.status()
151    }
152
153    pub async fn shutdown(&self) {
154        let mut runtime = self.runtime.lock().await;
155        Self::shutdown_crawler(&mut runtime.crawler).await;
156        Self::shutdown_mirror(&mut runtime.mirror).await;
157        Self::shutdown_sync(&mut runtime.sync).await;
158    }
159
160    async fn shutdown_crawler(crawler: &mut Option<socialgraph::crawler::SocialGraphTaskHandles>) {
161        let Some(handles) = crawler.take() else {
162            return;
163        };
164
165        let _ = handles.shutdown_tx.send(true);
166
167        let mut crawl_handle = handles.crawl_handle;
168        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut crawl_handle).await {
169            Ok(Ok(())) => {}
170            Ok(Err(err)) => tracing::warn!("Crawler task ended with join error: {}", err),
171            Err(_) => {
172                tracing::warn!("Timed out waiting for crawler task shutdown");
173                crawl_handle.abort();
174            }
175        }
176
177        let mut local_list_handle = handles.local_list_handle;
178        match tokio::time::timeout(std::time::Duration::from_secs(3), &mut local_list_handle).await
179        {
180            Ok(Ok(())) => {}
181            Ok(Err(err)) => tracing::warn!("Local list task ended with join error: {}", err),
182            Err(_) => {
183                tracing::warn!("Timed out waiting for local list task shutdown");
184                local_list_handle.abort();
185            }
186        }
187    }
188
189    async fn shutdown_sync(sync: &mut Option<BackgroundSyncRuntime>) {
190        let Some(mut runtime) = sync.take() else {
191            return;
192        };
193
194        runtime.service.shutdown();
195        if let Some(mut join) = runtime.join.take() {
196            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
197                Ok(Ok(())) => {}
198                Ok(Err(err)) => {
199                    tracing::warn!("Background sync task ended with join error: {}", err)
200                }
201                Err(_) => {
202                    tracing::warn!("Timed out waiting for background sync shutdown");
203                    join.abort();
204                }
205            }
206        }
207    }
208
209    async fn shutdown_mirror(mirror: &mut Option<BackgroundMirrorRuntime>) {
210        let Some(mut runtime) = mirror.take() else {
211            return;
212        };
213
214        runtime.service.shutdown();
215        if let Some(mut join) = runtime.join.take() {
216            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
217                Ok(Ok(())) => {}
218                Ok(Err(err)) => {
219                    tracing::warn!("Background mirror task ended with join error: {}", err)
220                }
221                Err(_) => {
222                    tracing::warn!("Timed out waiting for background mirror shutdown");
223                    join.abort();
224                }
225            }
226        }
227    }
228
229    pub async fn apply_config(&self, config: &Config) -> Result<EmbeddedBackgroundServicesStatus> {
230        let mut runtime = self.runtime.lock().await;
231
232        Self::shutdown_crawler(&mut runtime.crawler).await;
233        Self::shutdown_mirror(&mut runtime.mirror).await;
234        Self::shutdown_sync(&mut runtime.sync).await;
235
236        let active_relays = config.nostr.active_relays();
237
238        if config.nostr.enabled
239            && config.nostr.social_graph_crawl_depth > 0
240            && !active_relays.is_empty()
241        {
242            runtime.crawler = Some(socialgraph::crawler::spawn_social_graph_tasks(
243                self.graph_store.clone(),
244                self.keys.clone(),
245                active_relays.clone(),
246                config.nostr.social_graph_crawl_depth,
247                self.spambox.clone(),
248                self.data_dir.clone(),
249            ));
250
251            let service = Arc::new(
252                crate::nostr_mirror::BackgroundNostrMirror::new(
253                    crate::nostr_mirror::NostrMirrorConfig {
254                        relays: active_relays.clone(),
255                        publish_relays: Self::local_publish_relay(&config.server.bind_address)
256                            .into_iter()
257                            .collect(),
258                        max_follow_distance: config.nostr.social_graph_crawl_depth,
259                        overmute_threshold: config.nostr.overmute_threshold,
260                        require_negentropy: config.nostr.negentropy_only,
261                        kinds: config.nostr.mirror_kinds.clone(),
262                        history_sync_author_chunk_size: config
263                            .nostr
264                            .history_sync_author_chunk_size
265                            .max(1),
266                        missing_profile_backfill_batch_size: config
267                            .nostr
268                            .history_sync_author_chunk_size
269                            .max(1),
270                        history_sync_on_reconnect: config.nostr.history_sync_on_reconnect,
271                        ..crate::nostr_mirror::NostrMirrorConfig::default()
272                    },
273                    self.store.clone(),
274                    self.graph_store_concrete.clone(),
275                    Some(
276                        nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
277                            .context("Failed to parse keys for background nostr mirror")?,
278                    ),
279                )
280                .await
281                .context("Failed to create background nostr mirror")?,
282            );
283            let service_for_task = service.clone();
284            let join = tokio::task::spawn_blocking(move || {
285                let runtime = tokio::runtime::Builder::new_current_thread()
286                    .enable_all()
287                    .build()
288                    .expect("build background nostr mirror runtime");
289                runtime.block_on(async {
290                    if let Err(err) = service_for_task.run().await {
291                        tracing::error!("Background nostr mirror error: {:#}", err);
292                    }
293                });
294            });
295            runtime.mirror = Some(BackgroundMirrorRuntime {
296                service,
297                join: Some(join),
298            });
299        }
300
301        if config.sync.enabled
302            && (config.sync.sync_own || config.sync.sync_followed)
303            && !active_relays.is_empty()
304        {
305            let sync_config = crate::sync::SyncConfig {
306                sync_own: config.sync.sync_own,
307                sync_followed: config.sync.sync_followed,
308                relays: active_relays,
309                max_concurrent: config.sync.max_concurrent,
310                webrtc_timeout_ms: config.sync.webrtc_timeout_ms,
311                blossom_timeout_ms: config.sync.blossom_timeout_ms,
312            };
313
314            let sync_keys = nostr_sdk::Keys::parse(&self.keys.secret_key().to_bech32()?)
315                .context("Failed to parse keys for sync")?;
316            let service = Arc::new(
317                crate::sync::BackgroundSync::new(
318                    sync_config,
319                    self.store.clone(),
320                    sync_keys,
321                    self.webrtc_state.clone(),
322                )
323                .await
324                .context("Failed to create background sync service")?,
325            );
326            let contacts_file = self.data_dir.join("contacts.json");
327            let service_for_task = service.clone();
328            let join = tokio::spawn(async move {
329                if let Err(err) = service_for_task.run(contacts_file).await {
330                    tracing::error!("Background sync error: {}", err);
331                }
332            });
333            runtime.sync = Some(BackgroundSyncRuntime {
334                service,
335                join: Some(join),
336            });
337        }
338
339        Ok(runtime.status())
340    }
341}
342
343#[cfg(feature = "p2p")]
344pub struct EmbeddedPeerRouterController {
345    keys: Keys,
346    state: Arc<WebRTCState>,
347    store: Arc<dyn ContentStore>,
348    peer_classifier: PeerClassifier,
349    nostr_relay: Arc<NostrRelay>,
350    runtime: Mutex<Option<PeerRouterRuntime>>,
351}
352
353#[cfg(feature = "p2p")]
354impl EmbeddedPeerRouterController {
355    pub fn new(
356        keys: Keys,
357        state: Arc<WebRTCState>,
358        store: Arc<dyn ContentStore>,
359        peer_classifier: PeerClassifier,
360        nostr_relay: Arc<NostrRelay>,
361    ) -> Self {
362        Self {
363            keys,
364            state,
365            store,
366            peer_classifier,
367            nostr_relay,
368            runtime: Mutex::new(None),
369        }
370    }
371
372    pub fn state(&self) -> Arc<WebRTCState> {
373        self.state.clone()
374    }
375
376    pub async fn apply_config(&self, config: &Config) -> Result<bool> {
377        let mut runtime = self.runtime.lock().await;
378        if let Some(runtime_handle) = runtime.take() {
379            let _ = runtime_handle.shutdown.send(true);
380            let mut join = runtime_handle.join;
381            match tokio::time::timeout(std::time::Duration::from_secs(3), &mut join).await {
382                Ok(Ok(())) => {}
383                Ok(Err(err)) => {
384                    tracing::warn!("Peer router task ended with join error: {}", err);
385                }
386                Err(_) => {
387                    tracing::warn!("Timed out waiting for peer router shutdown");
388                    join.abort();
389                }
390            }
391        }
392
393        self.state.reset_runtime_state().await;
394
395        if !crate::p2p_common::peer_router_enabled(config) {
396            return Ok(false);
397        }
398
399        let webrtc_config = crate::p2p_common::default_webrtc_config(config);
400        let mut manager = PeerRouter::new_with_state_and_store_and_classifier(
401            self.keys.clone(),
402            webrtc_config,
403            self.state.clone(),
404            self.store.clone(),
405            self.peer_classifier.clone(),
406        );
407        manager.set_nostr_relay(self.nostr_relay.clone());
408        let shutdown = manager.shutdown_signal();
409        let join = tokio::spawn(async move {
410            if let Err(err) = manager.run().await {
411                tracing::error!("Peer router error: {}", err);
412            }
413        });
414        *runtime = Some(PeerRouterRuntime { shutdown, join });
415        Ok(true)
416    }
417}
418
419pub struct EmbeddedDaemonOptions {
420    pub config: Config,
421    pub data_dir: PathBuf,
422    pub bind_address: String,
423    pub relays: Option<Vec<String>>,
424    pub extra_routes: Option<Router<AppState>>,
425    pub cors: Option<CorsLayer>,
426}
427
428pub struct EmbeddedDaemonInfo {
429    pub addr: String,
430    pub port: u16,
431    pub npub: String,
432    pub store: Arc<HashtreeStore>,
433    #[allow(dead_code)]
434    pub webrtc_state: Option<Arc<WebRTCState>>,
435    #[cfg(feature = "p2p")]
436    #[allow(dead_code)]
437    pub peer_router_controller: Option<Arc<EmbeddedPeerRouterController>>,
438    #[allow(dead_code)]
439    pub background_services_controller: Option<Arc<EmbeddedBackgroundServicesController>>,
440}
441
442pub async fn start_embedded(opts: EmbeddedDaemonOptions) -> Result<EmbeddedDaemonInfo> {
443    let _ = rustls::crypto::ring::default_provider().install_default();
444
445    let mut config = opts.config;
446    if let Some(relays) = opts.relays {
447        config.nostr.relays = relays;
448        config.nostr.enabled = !config.nostr.relays.is_empty();
449    }
450
451    let max_size_bytes = config.storage.max_size_gb * 1024 * 1024 * 1024;
452    let nostr_db_max_bytes = config
453        .nostr
454        .db_max_size_gb
455        .saturating_mul(1024 * 1024 * 1024);
456    let spambox_db_max_bytes = config
457        .nostr
458        .spambox_max_size_gb
459        .saturating_mul(1024 * 1024 * 1024);
460
461    let store = Arc::new(HashtreeStore::with_options(
462        &opts.data_dir,
463        config.storage.s3.as_ref(),
464        max_size_bytes,
465    )?);
466
467    let (keys, _was_generated) = ensure_keys()?;
468    let pk_bytes = pubkey_bytes(&keys);
469    let npub = keys
470        .public_key()
471        .to_bech32()
472        .context("Failed to encode npub")?;
473
474    let mut allowed_pubkeys: HashSet<String> = HashSet::new();
475    allowed_pubkeys.insert(hex::encode(pk_bytes));
476    for npub_str in &config.nostr.allowed_npubs {
477        if let Ok(pk) = parse_npub(npub_str) {
478            allowed_pubkeys.insert(hex::encode(pk));
479        } else {
480            tracing::warn!("Invalid npub in allowed_npubs: {}", npub_str);
481        }
482    }
483
484    let graph_store = socialgraph::open_social_graph_store_with_storage(
485        &opts.data_dir,
486        store.store_arc(),
487        Some(nostr_db_max_bytes),
488    )
489    .context("Failed to initialize social graph store")?;
490    graph_store.set_profile_index_overmute_threshold(config.nostr.overmute_threshold);
491
492    let social_graph_root_bytes = if let Some(ref root_npub) = config.nostr.socialgraph_root {
493        parse_npub(root_npub).unwrap_or(pk_bytes)
494    } else {
495        pk_bytes
496    };
497    socialgraph::set_social_graph_root(&graph_store, &social_graph_root_bytes);
498    let social_graph_store: Arc<dyn socialgraph::SocialGraphBackend> = graph_store.clone();
499
500    let social_graph = Arc::new(socialgraph::SocialGraphAccessControl::new(
501        Arc::clone(&social_graph_store),
502        config.nostr.max_write_distance,
503        allowed_pubkeys.clone(),
504    ));
505
506    let nostr_relay_config = NostrRelayConfig {
507        spambox_db_max_bytes,
508        ..Default::default()
509    };
510    let mut public_event_pubkeys = HashSet::new();
511    public_event_pubkeys.insert(hex::encode(pk_bytes));
512    let nostr_relay = Arc::new(
513        NostrRelay::new(
514            Arc::clone(&social_graph_store),
515            opts.data_dir.clone(),
516            public_event_pubkeys,
517            Some(social_graph.clone()),
518            nostr_relay_config,
519        )
520        .context("Failed to initialize Nostr relay")?,
521    );
522
523    let crawler_spambox = if spambox_db_max_bytes == 0 {
524        None
525    } else {
526        let spam_dir = opts.data_dir.join("socialgraph_spambox");
527        match socialgraph::open_social_graph_store_at_path(&spam_dir, Some(spambox_db_max_bytes)) {
528            Ok(store) => Some(store),
529            Err(err) => {
530                tracing::warn!("Failed to open social graph spambox for crawler: {}", err);
531                None
532            }
533        }
534    };
535    let crawler_spambox_backend = crawler_spambox
536        .clone()
537        .map(|store| store as Arc<dyn socialgraph::SocialGraphBackend>);
538
539    #[cfg(feature = "p2p")]
540    let (webrtc_state, peer_router_controller): (
541        Option<Arc<WebRTCState>>,
542        Option<Arc<EmbeddedPeerRouterController>>,
543    ) = {
544        let router_config = crate::p2p_common::default_webrtc_config(&config);
545        let peer_classifier = crate::p2p_common::build_peer_classifier(
546            opts.data_dir.clone(),
547            Arc::clone(&social_graph_store),
548        );
549        let cashu_payment_client =
550            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
551                match crate::cashu_helper::CashuHelperClient::discover(opts.data_dir.clone()) {
552                    Ok(client) => {
553                        Some(Arc::new(client) as Arc<dyn crate::cashu_helper::CashuPaymentClient>)
554                    }
555                    Err(err) => {
556                        tracing::warn!(
557                        "Cashu settlement helper unavailable; paid retrieval stays disabled: {}",
558                        err
559                    );
560                        None
561                    }
562                }
563            } else {
564                None
565            };
566        let cashu_mint_metadata =
567            if config.cashu.default_mint.is_some() || !config.cashu.accepted_mints.is_empty() {
568                let metadata_path = crate::webrtc::cashu_mint_metadata_path(&opts.data_dir);
569                match crate::webrtc::CashuMintMetadataStore::load(metadata_path) {
570                    Ok(store) => Some(store),
571                    Err(err) => {
572                        tracing::warn!(
573                        "Failed to load Cashu mint metadata; falling back to in-memory state: {}",
574                        err
575                    );
576                        Some(crate::webrtc::CashuMintMetadataStore::in_memory())
577                    }
578                }
579            } else {
580                None
581            };
582
583        let state = Arc::new(WebRTCState::new_with_routing_and_cashu(
584            router_config.request_selection_strategy,
585            router_config.request_fairness_enabled,
586            router_config.request_dispatch,
587            std::time::Duration::from_millis(router_config.message_timeout_ms),
588            crate::webrtc::CashuRoutingConfig::from(&config.cashu),
589            cashu_payment_client,
590            cashu_mint_metadata,
591        ));
592        let controller = Arc::new(EmbeddedPeerRouterController::new(
593            keys.clone(),
594            state.clone(),
595            Arc::clone(&store) as Arc<dyn ContentStore>,
596            peer_classifier,
597            nostr_relay.clone(),
598        ));
599        controller.apply_config(&config).await?;
600        (Some(state), Some(controller))
601    };
602
603    #[cfg(not(feature = "p2p"))]
604    let webrtc_state: Option<Arc<crate::webrtc::WebRTCState>> = None;
605    #[cfg(not(feature = "p2p"))]
606    let peer_router_controller = None;
607
608    let background_services_controller = Arc::new(EmbeddedBackgroundServicesController::new(
609        keys.clone(),
610        opts.data_dir.clone(),
611        Arc::clone(&store),
612        graph_store.clone(),
613        Arc::clone(&social_graph_store),
614        crawler_spambox_backend,
615        webrtc_state.clone(),
616    ));
617    background_services_controller.apply_config(&config).await?;
618
619    let upstream_blossom = config.blossom.all_read_servers();
620    let active_nostr_relays = config.nostr.active_relays();
621
622    let mut server = HashtreeServer::new(Arc::clone(&store), opts.bind_address.clone())
623        .with_allowed_pubkeys(allowed_pubkeys.clone())
624        .with_max_upload_bytes((config.blossom.max_upload_mb as usize) * 1024 * 1024)
625        .with_public_writes(config.server.public_writes)
626        .with_upstream_blossom(upstream_blossom)
627        .with_nostr_relay_urls(active_nostr_relays)
628        .with_social_graph(social_graph)
629        .with_socialgraph_snapshot(
630            Arc::clone(&social_graph_store),
631            social_graph_root_bytes,
632            config.server.socialgraph_snapshot_public,
633        )
634        .with_nostr_relay(nostr_relay.clone());
635
636    if let Some(ref state) = webrtc_state {
637        server = server.with_webrtc_peers(state.clone());
638    }
639
640    if let Some(extra) = opts.extra_routes {
641        server = server.with_extra_routes(extra);
642    }
643    if let Some(cors) = opts.cors {
644        server = server.with_cors(cors);
645    }
646
647    spawn_background_eviction_task(
648        Arc::clone(&store),
649        BACKGROUND_EVICTION_INTERVAL,
650        "embedded daemon",
651    );
652
653    let listener = TcpListener::bind(&opts.bind_address).await?;
654    let local_addr = listener.local_addr()?;
655    let actual_addr = format!("{}:{}", local_addr.ip(), local_addr.port());
656
657    tokio::spawn(async move {
658        if let Err(e) = server.run_with_listener(listener).await {
659            tracing::error!("Embedded daemon server error: {}", e);
660        }
661    });
662
663    tracing::info!(
664        "Embedded daemon started on {}, identity {}",
665        actual_addr,
666        npub
667    );
668
669    Ok(EmbeddedDaemonInfo {
670        addr: actual_addr,
671        port: local_addr.port(),
672        npub,
673        store,
674        webrtc_state,
675        #[cfg(feature = "p2p")]
676        peer_router_controller,
677        background_services_controller: Some(background_services_controller),
678    })
679}