pub struct KitsuneHostImpl { /* private fields */ }
Expand description

Implementation of the Kitsune Host API. Lets Kitsune make requests of Holochain

Implementations§

Constructor

Examples found in repository?
src/conductor/conductor/builder.rs (lines 119-124)
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
    pub async fn build(self) -> ConductorResult<ConductorHandle> {
        tracing::info!(?self.config);

        let keystore = if let Some(keystore) = self.keystore {
            keystore
        } else {
            pub(crate) fn warn_no_encryption() {
                #[cfg(not(feature = "db-encryption"))]
                {
                    const MSG: &str = "WARNING: running without local db encryption";
                    eprintln!("{}", MSG);
                    println!("{}", MSG);
                    tracing::warn!("{}", MSG);
                }
            }
            let get_passphrase = || -> ConductorResult<sodoken::BufRead> {
                match self.passphrase {
                    None => Err(
                        one_err::OneErr::new("passphrase required for lair keystore api").into(),
                    ),
                    Some(p) => Ok(p),
                }
            };
            match &self.config.keystore {
                KeystoreConfig::DangerTestKeystore => spawn_test_keystore().await?,
                KeystoreConfig::LairServer { connection_url } => {
                    warn_no_encryption();
                    let passphrase = get_passphrase()?;
                    spawn_lair_keystore(connection_url.clone(), passphrase).await?
                }
                KeystoreConfig::LairServerInProc { lair_root } => {
                    warn_no_encryption();
                    let mut keystore_config_path = lair_root.clone().unwrap_or_else(|| {
                        let mut p: std::path::PathBuf = self.config.environment_path.clone().into();
                        p.push("keystore");
                        p
                    });
                    keystore_config_path.push("lair-keystore-config.yaml");
                    let passphrase = get_passphrase()?;
                    spawn_lair_keystore_in_proc(keystore_config_path, passphrase).await?
                }
            }
        };

        let Self {
            ribosome_store,
            config,
            ..
        } = self;

        let ribosome_store = RwShare::new(ribosome_store);

        let spaces = Spaces::new(&config)?;
        let tag = spaces.get_state().await?.tag().clone();

        let network_config = config.network.clone().unwrap_or_default();
        let (cert_digest, cert, cert_priv_key) =
            keystore.get_or_create_tls_cert_by_tag(tag.0).await?;
        let tls_config =
            holochain_p2p::kitsune_p2p::dependencies::kitsune_p2p_types::tls::TlsConfig {
                cert,
                cert_priv_key,
                cert_digest,
            };
        let strat = ArqStrat::from_params(network_config.tuning_params.gossip_redundancy_target);

        let host = KitsuneHostImpl::new(
            spaces.clone(),
            ribosome_store.clone(),
            network_config.tuning_params.clone(),
            strat,
        );

        let (holochain_p2p, p2p_evt) =
            holochain_p2p::spawn_holochain_p2p(network_config, tls_config, host).await?;

        let (post_commit_sender, post_commit_receiver) =
            tokio::sync::mpsc::channel(POST_COMMIT_CHANNEL_BOUND);

        let conductor = Conductor::new(
            config.clone(),
            ribosome_store,
            keystore,
            holochain_p2p,
            spaces,
            post_commit_sender,
        );

        let shutting_down = conductor.shutting_down.clone();

        #[cfg(any(test, feature = "test_utils"))]
        let conductor = Self::update_fake_state(self.state, conductor).await?;

        // Create handle
        let handle: ConductorHandle = Arc::new(conductor);

        {
            let handle = handle.clone();
            tokio::task::spawn(async move {
                while !shutting_down.load(std::sync::atomic::Ordering::Relaxed) {
                    tokio::time::sleep(std::time::Duration::from_secs(60)).await;
                    if let Err(e) = handle.prune_p2p_agents_db().await {
                        tracing::error!("failed to prune p2p_agents_db: {:?}", e);
                    }
                }
            });
        }

        Self::finish(
            handle,
            config,
            p2p_evt,
            post_commit_receiver,
            self.no_print_setup,
        )
        .await
    }

    pub(crate) fn spawn_post_commit(
        conductor_handle: ConductorHandle,
        receiver: tokio::sync::mpsc::Receiver<PostCommitArgs>,
    ) {
        let receiver_stream = tokio_stream::wrappers::ReceiverStream::new(receiver);
        tokio::task::spawn(receiver_stream.for_each_concurrent(
            POST_COMMIT_CONCURRENT_LIMIT,
            move |post_commit_args| {
                let conductor_handle = conductor_handle.clone();
                async move {
                    let PostCommitArgs {
                        host_access,
                        invocation,
                        cell_id,
                    } = post_commit_args;
                    match conductor_handle.clone().get_ribosome(cell_id.dna_hash()) {
                        Ok(ribosome) => {
                            if let Err(e) = tokio::task::spawn_blocking(move || {
                                if let Err(e) = ribosome.run_post_commit(host_access, invocation) {
                                    tracing::error!(?e);
                                }
                            })
                            .await
                            {
                                tracing::error!(?e);
                            }
                        }
                        Err(e) => {
                            tracing::error!(?e);
                        }
                    }
                }
            },
        ));
    }

    pub(crate) async fn finish(
        conductor: ConductorHandle,
        conductor_config: ConductorConfig,
        p2p_evt: holochain_p2p::event::HolochainP2pEventReceiver,
        post_commit_receiver: tokio::sync::mpsc::Receiver<PostCommitArgs>,
        no_print_setup: bool,
    ) -> ConductorResult<ConductorHandle> {
        conductor
            .clone()
            .start_scheduler(holochain_zome_types::schedule::SCHEDULER_INTERVAL)
            .await;

        tokio::task::spawn(p2p_event_task(p2p_evt, conductor.clone()));

        Self::spawn_post_commit(conductor.clone(), post_commit_receiver);

        let configs = conductor_config.admin_interfaces.unwrap_or_default();
        let cell_startup_errors = conductor.clone().initialize_conductor(configs).await?;

        // TODO: This should probably be emitted over the admin interface
        if !cell_startup_errors.is_empty() {
            error!(
                msg = "Failed to create the following active apps",
                ?cell_startup_errors
            );
        }

        if !no_print_setup {
            conductor.print_setup();
        }

        Ok(conductor)
    }

    /// Pass a test keystore in, to ensure that generated test agents
    /// are actually available for signing (especially for tryorama compat)
    pub fn with_keystore(mut self, keystore: MetaLairClient) -> Self {
        self.keystore = Some(keystore);
        self
    }

    #[cfg(any(test, feature = "test_utils"))]
    /// Sets some fake conductor state for tests
    pub fn fake_state(mut self, state: ConductorState) -> Self {
        self.state = Some(state);
        self
    }

    #[cfg(any(test, feature = "test_utils"))]
    pub(crate) async fn update_fake_state(
        state: Option<ConductorState>,
        conductor: Conductor,
    ) -> ConductorResult<Conductor> {
        if let Some(state) = state {
            conductor.update_state(move |_| Ok(state)).await?;
        }
        Ok(conductor)
    }

    /// Build a Conductor with a test environment
    #[cfg(any(test, feature = "test_utils"))]
    pub async fn test(
        mut self,
        env_path: &std::path::Path,
        extra_dnas: &[DnaFile],
    ) -> ConductorResult<ConductorHandle> {
        let keystore = self.keystore.unwrap_or_else(test_keystore);
        self.config.environment_path = env_path.to_path_buf().into();

        let spaces = Spaces::new(&self.config)?;

        let network_config = self.config.network.clone().unwrap_or_default();
        let tuning_params = network_config.tuning_params.clone();
        let strat = ArqStrat::from_params(tuning_params.gossip_redundancy_target);

        let ribosome_store = RwShare::new(self.ribosome_store);
        let host =
            KitsuneHostImpl::new(spaces.clone(), ribosome_store.clone(), tuning_params, strat);

        let (holochain_p2p, p2p_evt) =
                holochain_p2p::spawn_holochain_p2p(network_config, holochain_p2p::kitsune_p2p::dependencies::kitsune_p2p_types::tls::TlsConfig::new_ephemeral().await.unwrap(), host)
                    .await?;

        let (post_commit_sender, post_commit_receiver) =
            tokio::sync::mpsc::channel(POST_COMMIT_CHANNEL_BOUND);

        let conductor = Conductor::new(
            self.config.clone(),
            ribosome_store,
            keystore,
            holochain_p2p,
            spaces,
            post_commit_sender,
        );

        let conductor = Self::update_fake_state(self.state, conductor).await?;

        // Create handle
        let handle: ConductorHandle = Arc::new(conductor);

        // Install extra DNAs, in particular:
        // the ones with InlineZomes will not be registered in the Wasm DB
        // and cannot be automatically loaded on conductor restart.

        for dna_file in extra_dnas {
            handle
                .register_dna(dna_file.clone())
                .await
                .expect("Could not install DNA");
        }

        Self::finish(
            handle,
            self.config,
            p2p_evt,
            post_commit_receiver,
            self.no_print_setup,
        )
        .await
    }

Trait Implementations§

Extrapolated Peer Coverage.
Record a set of metric records.
We need to get previously stored agent info.
Query aggregate dht op data to form an LTCS set of region data.
Given an input list of regions, return a list of equal or greater length such that each region’s size is less than the size_limit, by recursively subdividing regions which are over the size limit.
Get all op hashes within a region
Get the quantum Topology associated with this Space.
Hashing function to get an op_hash from op_data.
Check which hashes we have data for.

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
TODO: once 1.33.0 is the minimum supported compiler version, remove Any::type_id_compat and use StdAny::type_id instead. https://github.com/rust-lang/rust/issues/27745
The archived version of the pointer metadata for this type.
Converts some archived metadata to the pointer metadata for itself.
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more
Deserializes using the given deserializer

Returns the argument unchanged.

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Attaches the current Context to this type, returning a WithContext wrapper. Read more
Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The alignment of pointer.
The type for initializers.
Initializes a with the given initializer. Read more
Dereferences the given pointer. Read more
Mutably dereferences the given pointer. Read more
Drops the object pointed to by the given pointer. Read more
The type for metadata in pointers and references to Self.
Should always be Self
The inverse inclusion map: attempts to construct self from the equivalent element of its superset. Read more
Checks if self is actually part of its subset T (and can be converted to it).
Use with care! Same as self.to_subset but without any property checks. Always succeeds.
The inclusion map: converts self to the equivalent element of its superset.
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.
upcast ref
upcast mut ref
upcast boxed dyn
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more