nu_zenoh/
lib.rs

1//
2// Copyright (c) 2025 ZettaScale Technology
3//
4// This program and the accompanying materials are made available under the
5// terms of the Eclipse Public License 2.0 which is available at
6// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
7// which is available at https://www.apache.org/licenses/LICENSE-2.0.
8//
9// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
10//
11// Contributors:
12//   ZettaScale Zenoh Team, <zenoh@zettascale.tech>
13//
14use std::{
15    collections::HashMap,
16    sync::{Arc, RwLock},
17};
18
19use nu_protocol::{
20    engine::{EngineState, StateWorkingSet},
21    LabeledError,
22};
23use zenoh::{internal::runtime::Runtime, Session, Wait};
24
25mod call_ext2;
26mod cmd;
27mod conv;
28mod interruptible_channel;
29mod signature_ext;
30
31#[derive(Debug, Clone)]
32pub struct Config {
33    pub experimental_options: bool,
34    pub no_default_session: bool,
35}
36
37/// Adds extra context (e.g. aliases) as Nu source code
38///
39/// This should be called after [`crate::add_zenoh_context`].
40pub const ZENOH_CONTEXT_EXTRAS: &[u8] = include_bytes!("nu/extras.nu");
41
42/// Adds all `zenoh *` commands to the given [`nu_protocol::engine::EngineState`].
43pub fn add_zenoh_context(mut engine_state: EngineState, options: Config) -> EngineState {
44    let delta = {
45        let mut working_set = StateWorkingSet::new(&engine_state);
46
47        let state = State::new(options.clone());
48
49        if options.experimental_options {
50            working_set.add_decl(Box::new(cmd::runtime::list::List::new(state.clone())));
51            working_set.add_decl(Box::new(cmd::runtime::open::Open::new(state.clone())));
52            working_set.add_decl(Box::new(cmd::runtime::close::Close::new(state.clone())));
53
54            working_set.add_decl(Box::new(cmd::pub_::Pub::new(state.clone())));
55            working_set.add_decl(Box::new(cmd::querier::Querier::new(state.clone())));
56
57            working_set.add_decl(Box::new(cmd::liveliness::declare_token::DeclareToken::new(
58                state.clone(),
59            )));
60            working_set.add_decl(Box::new(
61                cmd::liveliness::undeclare_token::UndeclareToken::new(state.clone()),
62            ));
63            working_set.add_decl(Box::new(cmd::liveliness::sub::Sub::new(state.clone())));
64
65            working_set.add_decl(Box::new(cmd::pub_::MatchingStatus::new(state.clone())));
66            working_set.add_decl(Box::new(cmd::querier::MatchingStatus::new(state.clone())));
67        }
68
69        working_set.add_decl(Box::new(cmd::put::Put::new(state.clone())));
70        working_set.add_decl(Box::new(cmd::delete::Delete::new(state.clone())));
71        working_set.add_decl(Box::new(cmd::get::Get::new(state.clone())));
72        working_set.add_decl(Box::new(cmd::sub::Sub::new(state.clone())));
73        working_set.add_decl(Box::new(cmd::zid::Zid::new(state.clone())));
74
75        working_set.add_decl(Box::new(cmd::liveliness::get::Get::new(state.clone())));
76        working_set.add_decl(Box::new(cmd::session::list::List::new(state.clone())));
77        working_set.add_decl(Box::new(cmd::session::open::Open::new(state.clone())));
78        working_set.add_decl(Box::new(cmd::session::close::Close::new(state.clone())));
79
80        working_set.add_decl(Box::new(cmd::log_path::LogPath::new(state.clone())));
81        working_set.add_decl(Box::new(cmd::queryable::Queryable::new(state.clone())));
82        working_set.add_decl(Box::new(cmd::scout::Scout::new(state.clone())));
83        working_set.add_decl(Box::new(cmd::info::Info::new(state.clone())));
84        working_set.add_decl(Box::new(cmd::config::Config::new(state)));
85
86        working_set.add_decl(Box::new(cmd::keyexpr::Includes));
87        working_set.add_decl(Box::new(cmd::keyexpr::Intersects));
88
89        working_set.render()
90    };
91
92    if let Err(err) = engine_state.merge_delta(delta) {
93        eprintln!("Error creating Zenoh command context: {err:?}");
94    }
95
96    engine_state
97}
98
99#[derive(Clone)]
100struct State {
101    options: Config,
102    sessions: Arc<RwLock<HashMap<String, Session>>>,
103    runtimes: Arc<RwLock<HashMap<String, Runtime>>>,
104}
105
106impl State {
107    const DEFAULT_SESSION_NAME: &str = "default";
108
109    fn new(options: Config) -> Self {
110        let mut sessions = HashMap::new();
111        if !options.no_default_session {
112            let default_session = zenoh::open(zenoh::Config::default())
113                .wait()
114                .expect("could not open default session");
115            sessions.insert(Self::DEFAULT_SESSION_NAME.to_string(), default_session);
116        }
117
118        Self {
119            options,
120            sessions: Arc::new(RwLock::new(sessions)),
121            runtimes: Arc::new(RwLock::new(HashMap::new())),
122        }
123    }
124}
125
126impl State {
127    pub(crate) fn with_session<F, T>(&self, name: &str, f: F) -> Result<T, LabeledError>
128    where
129        F: FnOnce(&Session) -> T,
130    {
131        let sessions = self.sessions.read().unwrap();
132        let session = sessions
133            .get(name)
134            .ok_or_else(|| LabeledError::new(format!("session '{name}' not found")))?;
135        Ok(f(session))
136    }
137}