Struct holochain::conductor::manager::ManagedTaskAdd
source · pub struct ManagedTaskAdd { /* private fields */ }
Expand description
A message sent to the TaskManager, registering an ManagedTask of a given kind.
Implementations§
source§impl ManagedTaskAdd
impl ManagedTaskAdd
sourcepub fn ignore(handle: JoinHandle<ManagedTaskResult>, name: &str) -> Self
pub fn ignore(handle: JoinHandle<ManagedTaskResult>, name: &str) -> Self
You just want the task in the task manager but don’t want to react to an error
Examples found in repository?
src/conductor/conductor.rs (lines 421-424)
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496
pub(crate) async fn add_admin_interfaces(
self: Arc<Self>,
configs: Vec<AdminInterfaceConfig>,
) -> ConductorResult<()> {
let admin_api = RealAdminInterfaceApi::new(self.clone());
let stop_tx = self.task_manager.share_ref(|tm| {
tm.as_ref()
.expect("Task manager not started yet")
.task_stop_broadcaster()
.clone()
});
// Closure to process each admin config item
let spawn_from_config = |AdminInterfaceConfig { driver, .. }| {
let admin_api = admin_api.clone();
let stop_tx = stop_tx.clone();
async move {
match driver {
InterfaceDriver::Websocket { port } => {
let (listener_handle, listener) =
spawn_websocket_listener(port).await?;
let port = listener_handle.local_addr().port().unwrap_or(port);
let handle: ManagedTaskHandle = spawn_admin_interface_task(
listener_handle,
listener,
admin_api.clone(),
stop_tx.subscribe(),
)?;
InterfaceResult::Ok((port, handle))
}
}
}
};
// spawn interface tasks, collect their JoinHandles,
// panic on errors.
let handles: Result<Vec<_>, _> =
future::join_all(configs.into_iter().map(spawn_from_config))
.await
.into_iter()
.collect();
// Exit if the admin interfaces fail to be created
let handles = handles.map_err(Box::new)?;
{
let mut ports = Vec::new();
// First, register the keepalive task, to ensure the conductor doesn't shut down
// in the absence of other "real" tasks
self.manage_task(ManagedTaskAdd::ignore(
tokio::spawn(keep_alive_task(stop_tx.subscribe())),
"keepalive task",
))
.await?;
// Now that tasks are spawned, register them with the TaskManager
for (port, handle) in handles {
ports.push(port);
self.manage_task(ManagedTaskAdd::ignore(
handle,
&format!("admin interface, port {}", port),
))
.await?
}
for p in ports {
self.add_admin_port(p);
}
}
Ok(())
}
/// Spawn a new app interface task, register it with the TaskManager,
/// and modify the conductor accordingly, based on the config passed in
/// which is just a networking port number (or 0 to auto-select one).
/// Returns the given or auto-chosen port number if giving an Ok Result
pub async fn add_app_interface(
self: Arc<Self>,
port: either::Either<u16, AppInterfaceId>,
) -> ConductorResult<u16> {
let interface_id = match port {
either::Either::Left(port) => AppInterfaceId::new(port),
either::Either::Right(id) => id,
};
let port = interface_id.port();
tracing::debug!("Attaching interface {}", port);
let app_api = RealAppInterfaceApi::new(self.clone());
// This receiver is thrown away because we can produce infinite new
// receivers from the Sender
let (signal_tx, _r) = tokio::sync::broadcast::channel(SIGNAL_BUFFER_SIZE);
let stop_rx = self.task_manager.share_ref(|tm| {
tm.as_ref()
.expect("Task manager not initialized")
.task_stop_broadcaster()
.subscribe()
});
let (port, task) = spawn_app_interface_task(port, app_api, signal_tx.clone(), stop_rx)
.await
.map_err(Box::new)?;
// TODO: RELIABILITY: Handle this task by restarting it if it fails and log the error
self.manage_task(ManagedTaskAdd::ignore(
task,
&format!("app interface, port {}", port),
))
.await?;
let interface = AppInterfaceRuntime::Websocket { signal_tx };
self.app_interfaces.share_mut(|app_interfaces| {
if app_interfaces.contains_key(&interface_id) {
return Err(ConductorError::AppInterfaceIdCollision(
interface_id.clone(),
));
}
app_interfaces.insert(interface_id.clone(), interface);
Ok(())
})?;
let config = AppInterfaceConfig::websocket(port);
self.update_state(|mut state| {
state.app_interfaces.insert(interface_id, config);
Ok(state)
})
.await?;
tracing::debug!("App interface added at port: {}", port);
Ok(port)
}
sourcepub fn unrecoverable(handle: JoinHandle<ManagedTaskResult>, name: &str) -> Self
pub fn unrecoverable(handle: JoinHandle<ManagedTaskResult>, name: &str) -> Self
If this task fails, the entire conductor must be shut down
sourcepub fn cell_critical(
handle: JoinHandle<ManagedTaskResult>,
cell_id: CellId,
name: &str
) -> Self
pub fn cell_critical(
handle: JoinHandle<ManagedTaskResult>,
cell_id: CellId,
name: &str
) -> Self
If this task fails, only the Cell which it runs under must be stopped
Examples found in repository?
src/core/queue_consumer.rs (lines 105-109)
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
pub async fn spawn_queue_consumer_tasks(
cell_id: CellId,
network: HolochainP2pDna,
space: &Space,
conductor_handle: ConductorHandle,
task_sender: sync::mpsc::Sender<ManagedTaskAdd>,
stop: sync::broadcast::Sender<()>,
) -> (QueueTriggers, InitialQueueTriggers) {
let Space {
authored_db,
dht_db,
cache_db: cache,
dht_query_cache,
..
} = space;
let keystore = conductor_handle.keystore().clone();
let dna_hash = Arc::new(cell_id.dna_hash().clone());
let queue_consumer_map = conductor_handle.get_queue_consumer_workflows();
// Publish
let (tx_publish, handle) = spawn_publish_dht_ops_consumer(
cell_id.agent_pubkey().clone(),
authored_db.clone(),
conductor_handle.clone(),
stop.subscribe(),
Box::new(network.clone()),
);
task_sender
.send(ManagedTaskAdd::cell_critical(
handle,
cell_id.clone(),
"publish_dht_ops_consumer",
))
.await
.expect("Failed to manage workflow handle");
// Validation Receipt
// One per space.
let (tx_receipt, handle) =
queue_consumer_map.spawn_once_validation_receipt(dna_hash.clone(), || {
spawn_validation_receipt_consumer(
dna_hash.clone(),
dht_db.clone(),
conductor_handle.clone(),
stop.subscribe(),
network.clone(),
)
});
if let Some(handle) = handle {
task_sender
.send(ManagedTaskAdd::cell_critical(
handle,
cell_id.clone(),
"validation_receipt_consumer",
))
.await
.expect("Failed to manage workflow handle");
}
// Integration
// One per space.
let (tx_integration, handle) =
queue_consumer_map.spawn_once_integration(dna_hash.clone(), || {
spawn_integrate_dht_ops_consumer(
dna_hash.clone(),
dht_db.clone(),
dht_query_cache.clone(),
stop.subscribe(),
tx_receipt.clone(),
network.clone(),
)
});
if let Some(handle) = handle {
task_sender
.send(ManagedTaskAdd::cell_critical(
handle,
cell_id.clone(),
"integrate_dht_ops_consumer",
))
.await
.expect("Failed to manage workflow handle");
}
let dna_def = conductor_handle
.get_dna_def(&*dna_hash)
.expect("Dna must be in store");
// App validation
// One per space.
let (tx_app, handle) = queue_consumer_map.spawn_once_app_validation(dna_hash.clone(), || {
spawn_app_validation_consumer(
dna_hash.clone(),
AppValidationWorkspace::new(
authored_db.clone().into(),
dht_db.clone(),
space.dht_query_cache.clone(),
cache.clone(),
keystore.clone(),
Arc::new(dna_def),
),
conductor_handle.clone(),
stop.subscribe(),
tx_integration.clone(),
network.clone(),
dht_query_cache.clone(),
)
});
if let Some(handle) = handle {
task_sender
.send(ManagedTaskAdd::cell_critical(
handle,
cell_id.clone(),
"app_validation_consumer",
))
.await
.expect("Failed to manage workflow handle");
}
let dna_def = conductor_handle
.get_dna_def(&*dna_hash)
.expect("Dna must be in store");
// Sys validation
// One per space.
let (tx_sys, handle) = queue_consumer_map.spawn_once_sys_validation(dna_hash.clone(), || {
spawn_sys_validation_consumer(
SysValidationWorkspace::new(
authored_db.clone().into(),
dht_db.clone().into(),
dht_query_cache.clone(),
cache.clone(),
Arc::new(dna_def),
),
space.clone(),
conductor_handle.clone(),
stop.subscribe(),
tx_app.clone(),
network.clone(),
)
});
if let Some(handle) = handle {
task_sender
.send(ManagedTaskAdd::cell_critical(
handle,
cell_id.clone(),
"sys_validation_consumer",
))
.await
.expect("Failed to manage workflow handle");
}
let (tx_cs, handle) = queue_consumer_map.spawn_once_countersigning(dna_hash.clone(), || {
spawn_countersigning_consumer(
space.clone(),
stop.subscribe(),
network.clone(),
tx_sys.clone(),
)
});
if let Some(handle) = handle {
task_sender
.send(ManagedTaskAdd::cell_critical(
handle,
cell_id.clone(),
"countersigning_consumer",
))
.await
.expect("Failed to manage workflow handle");
}
(
QueueTriggers {
sys_validation: tx_sys.clone(),
publish_dht_ops: tx_publish.clone(),
countersigning: tx_cs,
integrate_dht_ops: tx_integration.clone(),
},
InitialQueueTriggers::new(tx_sys, tx_publish, tx_app, tx_integration, tx_receipt),
)
}
sourcepub fn dna_critical(
handle: JoinHandle<ManagedTaskResult>,
dna_hash: Arc<DnaHash>,
name: &str
) -> Self
pub fn dna_critical(
handle: JoinHandle<ManagedTaskResult>,
dna_hash: Arc<DnaHash>,
name: &str
) -> Self
If this task fails, only the Cells with this DnaHash must be stopped
sourcepub fn generic(handle: JoinHandle<ManagedTaskResult>, f: OnDeath) -> Self
pub fn generic(handle: JoinHandle<ManagedTaskResult>, f: OnDeath) -> Self
Handle a task’s completion with a generic callback
Trait Implementations§
source§impl Debug for ManagedTaskAdd
impl Debug for ManagedTaskAdd
source§impl Future for ManagedTaskAdd
impl Future for ManagedTaskAdd
Auto Trait Implementations§
impl !RefUnwindSafe for ManagedTaskAdd
impl Send for ManagedTaskAdd
impl Sync for ManagedTaskAdd
impl Unpin for ManagedTaskAdd
impl !UnwindSafe for ManagedTaskAdd
Blanket Implementations§
§impl<T> Any for Twhere
T: Any + ?Sized,
impl<T> Any for Twhere
T: Any + ?Sized,
§fn type_id_compat(&self) -> TypeId
fn type_id_compat(&self) -> TypeId
TODO: once 1.33.0 is the minimum supported compiler version, remove
Any::type_id_compat and use StdAny::type_id instead.
https://github.com/rust-lang/rust/issues/27745
§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
§type ArchivedMetadata = ()
type ArchivedMetadata = ()
The archived version of the pointer metadata for this type.
§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata
) -> <T as Pointee>::Metadata
fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata
) -> <T as Pointee>::Metadata
Converts some archived metadata to the pointer metadata for itself.
§impl<F, W, T, D> Deserialize<With<T, W>, D> for Fwhere
W: DeserializeWith<F, T, D>,
D: Fallible + ?Sized,
F: ?Sized,
impl<F, W, T, D> Deserialize<With<T, W>, D> for Fwhere
W: DeserializeWith<F, T, D>,
D: Fallible + ?Sized,
F: ?Sized,
§fn deserialize(
&self,
deserializer: &mut D
) -> Result<With<T, W>, <D as Fallible>::Error>
fn deserialize(
&self,
deserializer: &mut D
) -> Result<With<T, W>, <D as Fallible>::Error>
Deserializes using the given deserializer
§impl<T> FutureExt for Twhere
T: Future + ?Sized,
impl<T> FutureExt for Twhere
T: Future + ?Sized,
§fn map<U, F>(self, f: F) -> Map<Self, F> ⓘwhere
F: FnOnce(Self::Output) -> U,
Self: Sized,
fn map<U, F>(self, f: F) -> Map<Self, F> ⓘwhere
F: FnOnce(Self::Output) -> U,
Self: Sized,
Map this future’s output to a different type, returning a new future of
the resulting type. Read more
§fn map_into<U>(self) -> MapInto<Self, U> ⓘwhere
Self::Output: Into<U>,
Self: Sized,
fn map_into<U>(self) -> MapInto<Self, U> ⓘwhere
Self::Output: Into<U>,
Self: Sized,
Map this future’s output to a different type, returning a new future of
the resulting type. Read more
§fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> ⓘwhere
F: FnOnce(Self::Output) -> Fut,
Fut: Future,
Self: Sized,
fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F> ⓘwhere
F: FnOnce(Self::Output) -> Fut,
Fut: Future,
Self: Sized,
Chain on a computation for when a future finished, passing the result of
the future to the provided closure
f
. Read more§fn into_stream(self) -> IntoStream<Self>where
Self: Sized,
fn into_stream(self) -> IntoStream<Self>where
Self: Sized,
Convert this future into a single element stream. Read more
§fn flatten(self) -> Flatten<Self> ⓘwhere
Self::Output: Future,
Self: Sized,
fn flatten(self) -> Flatten<Self> ⓘwhere
Self::Output: Future,
Self: Sized,
Flatten the execution of this future when the output of this
future is itself another future. Read more
§fn flatten_stream(self) -> FlattenStream<Self>where
Self::Output: Stream,
Self: Sized,
fn flatten_stream(self) -> FlattenStream<Self>where
Self::Output: Stream,
Self: Sized,
Flatten the execution of this future when the successful result of this
future is a stream. Read more
§fn fuse(self) -> Fuse<Self> ⓘwhere
Self: Sized,
fn fuse(self) -> Fuse<Self> ⓘwhere
Self: Sized,
Fuse a future such that
poll
will never again be called once it has
completed. This method can be used to turn any Future
into a
FusedFuture
. Read more§fn inspect<F>(self, f: F) -> Inspect<Self, F> ⓘwhere
F: FnOnce(&Self::Output),
Self: Sized,
fn inspect<F>(self, f: F) -> Inspect<Self, F> ⓘwhere
F: FnOnce(&Self::Output),
Self: Sized,
Do something with the output of a future before passing it on. Read more
§fn catch_unwind(self) -> CatchUnwind<Self> ⓘwhere
Self: Sized + UnwindSafe,
fn catch_unwind(self) -> CatchUnwind<Self> ⓘwhere
Self: Sized + UnwindSafe,
Catches unwinding panics while polling the future. Read more
Create a cloneable handle to this future where all handles will resolve
to the same result. Read more
§fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)where
Self: Sized,
fn remote_handle(self) -> (Remote<Self>, RemoteHandle<Self::Output>)where
Self: Sized,
Turn this future into a future that yields
()
on completion and sends
its output to another future on a separate task. Read more§fn boxed<'a>(
self
) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a, Global>>where
Self: 'a + Sized + Send,
fn boxed<'a>(
self
) -> Pin<Box<dyn Future<Output = Self::Output> + Send + 'a, Global>>where
Self: 'a + Sized + Send,
Wrap the future in a Box, pinning it. Read more
§fn boxed_local<'a>(
self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a, Global>>where
Self: 'a + Sized,
fn boxed_local<'a>(
self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a, Global>>where
Self: 'a + Sized,
Wrap the future in a Box, pinning it. Read more
§fn unit_error(self) -> UnitError<Self> ⓘwhere
Self: Sized,
fn unit_error(self) -> UnitError<Self> ⓘwhere
Self: Sized,
Turns a
Future<Output = T>
into a
TryFuture<Ok = T, Error = ()
>.§fn never_error(self) -> NeverError<Self> ⓘwhere
Self: Sized,
fn never_error(self) -> NeverError<Self> ⓘwhere
Self: Sized,
Turns a
Future<Output = T>
into a
TryFuture<Ok = T, Error = Never
>.§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self> ⓘ
fn with_context(self, otel_cx: Context) -> WithContext<Self> ⓘ
§fn with_current_context(self) -> WithContext<Self> ⓘ
fn with_current_context(self) -> WithContext<Self> ⓘ
§impl<T> FutureExt for Twhere
T: Future + ?Sized,
impl<T> FutureExt for Twhere
T: Future + ?Sized,
§fn delay(self, dur: Duration) -> DelayFuture<Self>where
Self: Sized,
fn delay(self, dur: Duration) -> DelayFuture<Self>where
Self: Sized,
Returns a Future that delays execution for a specified time. Read more
§fn flatten(self) -> FlattenFuture<Self, <Self::Output as IntoFuture>::Future>where
Self: Sized,
Self::Output: IntoFuture,
fn flatten(self) -> FlattenFuture<Self, <Self::Output as IntoFuture>::Future>where
Self: Sized,
Self::Output: IntoFuture,
Flatten out the execution of this future when the result itself
can be converted into another future. Read more
§fn race<F>(self, other: F) -> Race<Self, F>where
Self: Future + Sized,
F: Future<Output = Self::Output>,
fn race<F>(self, other: F) -> Race<Self, F>where
Self: Future + Sized,
F: Future<Output = Self::Output>,
Waits for one of two similarly-typed futures to complete. Read more
§fn try_race<F, T, E>(self, other: F) -> TryRace<Self, F>where
Self: Future<Output = Result<T, E>> + Sized,
F: Future<Output = Self::Output>,
fn try_race<F, T, E>(self, other: F) -> TryRace<Self, F>where
Self: Future<Output = Result<T, E>> + Sized,
F: Future<Output = Self::Output>,
Waits for one of two similarly-typed fallible futures to complete. Read more
§fn join<F>(self, other: F) -> Join<Self, F>where
Self: Future + Sized,
F: Future,
fn join<F>(self, other: F) -> Join<Self, F>where
Self: Future + Sized,
F: Future,
Waits for two similarly-typed futures to complete. Read more
§impl<F> FutureExt for Fwhere
F: Future + ?Sized,
impl<F> FutureExt for Fwhere
F: Future + ?Sized,
§fn catch_unwind(self) -> CatchUnwind<Self> ⓘwhere
Self: Sized + UnwindSafe,
fn catch_unwind(self) -> CatchUnwind<Self> ⓘwhere
Self: Sized + UnwindSafe,
Catches panics while polling the future. Read more
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
source§fn in_current_span(self) -> Instrumented<Self> ⓘ
fn in_current_span(self) -> Instrumented<Self> ⓘ
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
fn instrument(self, span: Span) -> Instrumented<Self> ⓘ
source§fn in_current_span(self) -> Instrumented<Self> ⓘ
fn in_current_span(self) -> Instrumented<Self> ⓘ
source§impl<F> IntoFuture for Fwhere
F: Future,
impl<F> IntoFuture for Fwhere
F: Future,
§type IntoFuture = F
type IntoFuture = F
Which kind of future are we turning this into?
source§fn into_future(self) -> <F as IntoFuture>::IntoFuture
fn into_future(self) -> <F as IntoFuture>::IntoFuture
Creates a future from a value. Read more
source§impl<T> IntoMustBoxFuture for Twhere
T: Future + ?Sized,
impl<T> IntoMustBoxFuture for Twhere
T: Future + ?Sized,
§impl<T> Pointable for T
impl<T> Pointable for T
§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
The inverse inclusion map: attempts to construct
self
from the equivalent element of its
superset. Read more§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
Checks if
self
is actually part of its subset T
(and can be converted to it).§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
Use with care! Same as
self.to_subset
but without any property checks. Always succeeds.§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
The inclusion map: converts
self
to the equivalent element of its superset.§impl<T> Upcastable for Twhere
T: 'static + Any + Send + Sync,
impl<T> Upcastable for Twhere
T: 'static + Any + Send + Sync,
§fn upcast_any_ref(&self) -> &(dyn Any + 'static)
fn upcast_any_ref(&self) -> &(dyn Any + 'static)
upcast ref
§fn upcast_any_mut(&mut self) -> &mut (dyn Any + 'static)
fn upcast_any_mut(&mut self) -> &mut (dyn Any + 'static)
upcast mut ref