#![cfg_attr(feature = "fail-on-warnings", deny(warnings))]
#![warn(clippy::all, clippy::pedantic, clippy::nursery, clippy::cargo)]
#![allow(clippy::multiple_crate_versions)]
#![allow(clippy::result_large_err)]
pub mod context_state;
pub use context_state::ContextState;
use bmux_clients_plugin_api::clients_state::{
self as clients_state, ClientQueryError, ClientSummary,
};
use bmux_context_state::{
ContextSelector as PrimitiveContextSelector, ContextStateHandle, ContextStateReader,
ContextStateSnapshot, ContextStateWriter, ContextSummary as PrimitiveContextSummary,
RuntimeContext,
};
use bmux_contexts_plugin_api::contexts_commands::{
self, CloseContextError, ContextAck, ContextsCommandsService, CreateContextError,
RenameContextError, SelectContextError,
};
use bmux_contexts_plugin_api::contexts_events::{self, ContextEvent};
use bmux_contexts_plugin_api::contexts_state::{
self, ContextQueryError, ContextSelector as StateContextSelector, ContextSummary,
ContextsStateService,
};
use bmux_plugin::{
HostRuntimeApi, ServiceCaller, TypedServiceCaller, global_event_bus,
global_plugin_state_registry,
};
use bmux_plugin_sdk::prelude::*;
use bmux_plugin_sdk::{
PluginEventKind, StatefulPlugin, StatefulPluginError, StatefulPluginHandle,
StatefulPluginResult, StatefulPluginSnapshot, TypedServiceRegistrationContext,
TypedServiceRegistry,
};
use bmux_session_models::{ClientId, SessionId};
use bmux_sessions_plugin_api::sessions_commands::{self, RenameSessionError};
use bmux_sessions_plugin_api::sessions_state::SessionSelector;
use bmux_snapshot_runtime::StatefulPluginRegistry;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use tracing::instrument;
use uuid::Uuid;
struct ContextStateAdapter {
inner: Arc<RwLock<ContextState>>,
}
impl ContextStateAdapter {
fn with_read<T>(&self, f: impl FnOnce(&ContextState) -> T, fallback: T) -> T {
self.inner.read().map_or(fallback, |guard| f(&guard))
}
fn with_write<T>(&self, f: impl FnOnce(&mut ContextState) -> T, fallback: T) -> T {
self.inner
.write()
.map_or(fallback, |mut guard| f(&mut guard))
}
}
impl ContextStateReader for ContextStateAdapter {
fn list(&self) -> Vec<PrimitiveContextSummary> {
self.with_read(ContextState::list, Vec::<PrimitiveContextSummary>::new())
}
fn current_for_client(&self, client_id: ClientId) -> Option<PrimitiveContextSummary> {
self.with_read(|state| state.current_for_client(client_id), None)
}
fn current_session_for_client(&self, client_id: ClientId) -> Option<SessionId> {
self.with_read(|state| state.current_session_for_client(client_id), None)
}
fn context_for_session(&self, session_id: SessionId) -> Option<Uuid> {
self.with_read(|state| state.context_for_session(session_id), None)
}
fn resolve_id(
&self,
selector: &PrimitiveContextSelector,
) -> std::result::Result<Uuid, &'static str> {
self.with_read(
|state| state.resolve_id(selector),
Err("context-state lock poisoned"),
)
}
}
impl ContextStateWriter for ContextStateAdapter {
fn create(
&self,
client_id: ClientId,
name: Option<String>,
attributes: BTreeMap<String, String>,
) -> PrimitiveContextSummary {
let fallback = PrimitiveContextSummary {
id: Uuid::nil(),
name: name.clone(),
attributes: attributes.clone(),
};
self.with_write(|state| state.create(client_id, name, attributes), fallback)
}
fn select_for_client(
&self,
client_id: ClientId,
selector: &PrimitiveContextSelector,
) -> std::result::Result<PrimitiveContextSummary, &'static str> {
self.with_write(
|state| state.select_for_client(client_id, selector),
Err("context-state lock poisoned"),
)
}
fn rename(
&self,
selector: &PrimitiveContextSelector,
name: String,
) -> std::result::Result<PrimitiveContextSummary, &'static str> {
self.with_write(
|state| state.rename(selector, name),
Err("context-state lock poisoned"),
)
}
fn close(
&self,
client_id: ClientId,
selector: &PrimitiveContextSelector,
force: bool,
) -> std::result::Result<(Uuid, Option<SessionId>), &'static str> {
self.with_write(
|state| state.close(client_id, selector, force),
Err("context-state lock poisoned"),
)
}
fn remove_contexts_for_session(&self, session_id: SessionId) -> Vec<Uuid> {
self.with_write(
|state| state.remove_contexts_for_session(session_id),
Vec::new(),
)
}
fn bind_session(
&self,
context_id: Uuid,
session_id: SessionId,
) -> std::result::Result<(), &'static str> {
self.with_write(
|state| state.bind_session(context_id, session_id),
Err("context-state lock poisoned"),
)
}
fn disconnect_client(&self, client_id: ClientId) {
self.with_write(|state| state.disconnect_client(client_id), ());
}
fn remove_context_by_id(
&self,
context_id: Uuid,
preferred_client: Option<ClientId>,
) -> Option<(Uuid, Option<SessionId>)> {
self.with_write(
|state| state.remove_context_by_id(context_id, preferred_client),
None,
)
}
fn snapshot(&self) -> ContextStateSnapshot {
self.with_read(
|state| {
let contexts = state
.contexts
.iter()
.map(|(id, rc)| {
(
*id,
RuntimeContext {
id: rc.id,
name: rc.name.clone(),
attributes: rc.attributes.clone(),
},
)
})
.collect();
ContextStateSnapshot {
contexts,
session_by_context: state.session_by_context.clone(),
selected_by_client: state.selected_by_client.clone(),
mru_contexts: state.mru_contexts.clone(),
}
},
ContextStateSnapshot::default(),
)
}
fn restore_snapshot(&self, snapshot: ContextStateSnapshot) {
self.with_write(
|state| {
state.contexts = snapshot.contexts;
state.session_by_context = snapshot.session_by_context;
state.selected_by_client = snapshot.selected_by_client;
state.mru_contexts = snapshot.mru_contexts;
},
(),
);
}
}
const CONTEXTS_STATEFUL_ID: PluginEventKind =
PluginEventKind::from_static("bmux.contexts/context-state");
const CONTEXTS_STATEFUL_VERSION: u32 = 1;
struct ContextsStatefulPlugin {
writer: Arc<dyn ContextStateWriter>,
}
impl StatefulPlugin for ContextsStatefulPlugin {
fn id(&self) -> PluginEventKind {
CONTEXTS_STATEFUL_ID
}
fn snapshot(&self) -> StatefulPluginResult<StatefulPluginSnapshot> {
let snap = self.writer.snapshot();
let bytes =
serde_json::to_vec(&snap).map_err(|err| StatefulPluginError::SnapshotFailed {
plugin: CONTEXTS_STATEFUL_ID.as_str().to_string(),
details: err.to_string(),
})?;
Ok(StatefulPluginSnapshot::new(
CONTEXTS_STATEFUL_ID,
CONTEXTS_STATEFUL_VERSION,
bytes,
))
}
fn restore_snapshot(&self, snapshot: StatefulPluginSnapshot) -> StatefulPluginResult<()> {
if snapshot.version != CONTEXTS_STATEFUL_VERSION {
return Err(StatefulPluginError::UnsupportedVersion {
plugin: CONTEXTS_STATEFUL_ID.as_str().to_string(),
version: snapshot.version,
expected: vec![CONTEXTS_STATEFUL_VERSION],
});
}
let decoded: ContextStateSnapshot =
serde_json::from_slice(&snapshot.bytes).map_err(|err| {
StatefulPluginError::RestoreFailed {
plugin: CONTEXTS_STATEFUL_ID.as_str().to_string(),
details: err.to_string(),
}
})?;
self.writer.restore_snapshot(decoded);
Ok(())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CreateContextArgs {
#[serde(default)]
name: Option<String>,
attributes: BTreeMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct SelectorArgs {
selector: WireSelector,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CloseContextArgs {
selector: WireSelector,
force: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RenameContextArgs {
selector: WireSelector,
name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WireSelector {
#[serde(default)]
id: Option<::uuid::Uuid>,
#[serde(default)]
name: Option<String>,
}
impl WireSelector {
fn to_primitive(&self) -> Option<PrimitiveContextSelector> {
if let Some(id) = self.id {
return Some(PrimitiveContextSelector::ById(id));
}
self.name
.as_ref()
.map(|name| PrimitiveContextSelector::ByName(name.clone()))
}
}
#[derive(Default)]
pub struct ContextsPlugin;
impl RustPlugin for ContextsPlugin {
type Contract = bmux_contexts_plugin_api::Contract;
fn activate(
&mut self,
_context: NativeLifecycleContext,
) -> std::result::Result<i32, PluginCommandError> {
let state: Arc<RwLock<ContextState>> = Arc::new(RwLock::new(ContextState::default()));
global_plugin_state_registry().register::<ContextState>(&state);
let adapter = ContextStateAdapter {
inner: Arc::clone(&state),
};
let handle = Arc::new(RwLock::new(ContextStateHandle::new(adapter)));
global_plugin_state_registry().register::<ContextStateHandle>(&handle);
let writer_for_snapshot: Arc<dyn ContextStateWriter> = {
let guard = handle
.read()
.expect("freshly-created ContextStateHandle lock is poisoned");
Arc::clone(&guard.0)
};
let stateful = StatefulPluginHandle::new(ContextsStatefulPlugin {
writer: writer_for_snapshot,
});
let registry = global_plugin_state_registry();
let stateful_registry = bmux_snapshot_runtime::get_or_init_stateful_registry(
|| registry.get::<StatefulPluginRegistry>(),
|fresh| {
registry.register::<StatefulPluginRegistry>(fresh);
},
);
stateful_registry
.write()
.expect("stateful plugin registry lock poisoned")
.push(stateful);
global_event_bus().register_channel::<ContextEvent>(contexts_events::EVENT_KIND);
Ok(bmux_plugin_sdk::EXIT_OK)
}
fn run_command(
&mut self,
_context: NativeCommandContext,
) -> std::result::Result<i32, PluginCommandError> {
Err(PluginCommandError::unknown_command(""))
}
fn invoke_service(&self, context: NativeServiceContext) -> ServiceResponse {
bmux_plugin_sdk::route_service!(context, {
"contexts-state", "list-contexts" => |_req: (), ctx| {
list_contexts_local(ctx)
.map_err(|e| ServiceResponse::error("list_failed", e))
},
"contexts-state", "get-context" => |req: SelectorArgs, ctx| {
get_context_local(ctx, &req.selector)
.map_err(|e| ServiceResponse::error("get_failed", e))
},
"contexts-state", "current-context" => |_req: (), ctx| {
current_context_local(ctx, ctx.caller_client_id)
.map_err(|e| ServiceResponse::error("current_failed", e))
},
"contexts-commands", "create-context" => |req: CreateContextArgs, ctx| {
Ok::<Result<ContextAck, CreateContextError>, ServiceResponse>(
create_context_local(ctx, ctx.caller_client_id, req.name, req.attributes)
)
},
"contexts-commands", "select-context" => |req: SelectorArgs, ctx| {
Ok::<Result<ContextAck, SelectContextError>, ServiceResponse>(
select_context_local(ctx, ctx.caller_client_id, &req.selector)
)
},
"contexts-commands", "close-context" => |req: CloseContextArgs, ctx| {
Ok::<Result<ContextAck, CloseContextError>, ServiceResponse>(
close_context_local(ctx, ctx.caller_client_id, &req.selector, req.force)
)
},
"contexts-commands", "rename-context" => |req: RenameContextArgs, ctx| {
Ok::<Result<ContextAck, RenameContextError>, ServiceResponse>(
rename_context_local(ctx, &req.selector, &req.name)
)
},
})
}
fn register_typed_services(
&self,
context: TypedServiceRegistrationContext<'_>,
registry: &mut TypedServiceRegistry,
) {
let caller = Arc::new(TypedServiceCaller::from_registration_context(&context));
let state: Arc<dyn ContextsStateService + Send + Sync> =
Arc::new(ContextsStateHandle::new(Arc::clone(&caller)));
let _ = contexts_state::register_provider(registry, state);
let commands: Arc<dyn ContextsCommandsService + Send + Sync> =
Arc::new(ContextsCommandsHandle::new(caller));
let _ = contexts_commands::register_provider(registry, commands);
}
}
fn local_state() -> Result<Arc<RwLock<ContextState>>, String> {
global_plugin_state_registry()
.get::<ContextState>()
.ok_or_else(|| {
"contexts-plugin: ContextState not registered in this process \
(activate did not run here; typed dispatch should forward to the \
process that owns the activated provider)"
.to_string()
})
}
const fn dispatch_client<C: ServiceCaller + Sync + ?Sized>(
caller: &C,
) -> bmux_plugin::ServiceCallerDispatchClient<'_, C> {
bmux_plugin::ServiceCallerDispatchClient::new(caller)
}
fn resolve_caller_client_id(
caller: &impl ServiceCaller,
caller_client_id: Option<::uuid::Uuid>,
) -> Result<ClientId, String> {
if let Some(id) = caller_client_id {
return Ok(ClientId(id));
}
match caller.call_service::<(), std::result::Result<ClientSummary, ClientQueryError>>(
bmux_clients_plugin_api::capabilities::CLIENTS_READ.as_str(),
ServiceKind::Query,
clients_state::INTERFACE_ID.as_str(),
"current-client",
&(),
) {
Ok(Ok(summary)) => Ok(ClientId(summary.id)),
Ok(Err(err)) => Err(format!("current-client query failed: {err:?}")),
Err(err) => Err(err.to_string()),
}
}
fn list_contexts_local(_caller: &impl ServiceCaller) -> Result<Vec<ContextSummary>, String> {
let state = local_state()?;
let guard = state
.read()
.map_err(|_| "context state lock poisoned".to_string())?;
Ok(guard.list().into_iter().map(ipc_summary_to_typed).collect())
}
#[allow(clippy::significant_drop_tightening)]
fn get_context_local(
_caller: &impl ServiceCaller,
selector: &WireSelector,
) -> Result<Result<ContextSummary, ContextQueryError>, String> {
let Some(context_selector) = selector.to_primitive() else {
return Ok(Err(ContextQueryError::InvalidSelector {
reason: "selector must specify either id or name".to_string(),
}));
};
let state = local_state()?;
let guard = state
.read()
.map_err(|_| "context state lock poisoned".to_string())?;
let resolved = guard.resolve_id(&context_selector);
Ok(resolved.map_or(Err(ContextQueryError::NotFound), |id| {
guard
.contexts
.get(&id)
.map(ContextState::to_summary)
.map(ipc_summary_to_typed)
.ok_or(ContextQueryError::NotFound)
}))
}
fn current_context_local(
caller: &impl ServiceCaller,
caller_client_id: Option<::uuid::Uuid>,
) -> Result<Option<ContextSummary>, String> {
let client_id = resolve_caller_client_id(caller, caller_client_id)?;
let state = local_state()?;
let guard = state
.read()
.map_err(|_| "context state lock poisoned".to_string())?;
Ok(guard
.current_for_client(client_id)
.map(ipc_summary_to_typed))
}
#[instrument(
level = "debug",
target = "bmux_contexts_plugin::lifecycle",
skip_all,
fields(
caller_client_id = ?caller_client_id,
name = %name.as_deref().unwrap_or("<unnamed>"),
context_id = tracing::field::Empty,
session_id = tracing::field::Empty,
),
)]
fn create_context_local(
caller: &(impl ServiceCaller + Sync),
caller_client_id: Option<::uuid::Uuid>,
name: Option<String>,
attributes: BTreeMap<String, String>,
) -> Result<ContextAck, CreateContextError> {
let client_id = resolve_caller_client_id(caller, caller_client_id)
.map_err(|reason| CreateContextError::Failed { reason })?;
tracing::debug!(
target: "bmux_contexts_plugin::lifecycle",
client_id = %client_id.0,
name = %name.as_deref().unwrap_or("<unnamed>"),
"create_context_local begin",
);
let session_id = create_session_via_sessions_plugin(caller, name.clone())?;
let (context_summary, bind_result) =
mutate_state_create(client_id, name.clone(), attributes, session_id)?;
if let Err(reason) = bind_result {
return Err(CreateContextError::Failed {
reason: reason.to_string(),
});
}
if let Err(reason) = select_session_via_sessions_plugin(caller, session_id) {
let _ = caller.log_write(&bmux_plugin_sdk::LogWriteRequest {
level: bmux_plugin_sdk::LogWriteLevel::Warn,
message: format!(
"contexts.create_context: failed to select created session (context_id={} session_id={}): {reason}",
context_summary.id, session_id.0,
),
target: Some("bmux.contexts".to_string()),
});
}
tracing::Span::current().record("context_id", tracing::field::display(context_summary.id));
tracing::Span::current().record("session_id", tracing::field::display(session_id.0));
let _ = global_event_bus().emit(
&contexts_events::EVENT_KIND,
ContextEvent::Created {
context_id: context_summary.id,
name,
},
);
let _ = global_event_bus().emit(
&contexts_events::EVENT_KIND,
ContextEvent::Selected {
context_id: context_summary.id,
},
);
let _ = global_event_bus().emit(
&contexts_events::EVENT_KIND,
ContextEvent::SessionActiveContextChanged {
session_id: session_id.0,
context_id: context_summary.id,
initiator_client_id: Some(client_id.0),
},
);
Ok(ContextAck {
id: context_summary.id,
session_id: Some(session_id.0),
})
}
#[allow(clippy::significant_drop_tightening)]
fn mutate_state_create(
client_id: ClientId,
name: Option<String>,
attributes: BTreeMap<String, String>,
session_id: SessionId,
) -> Result<
(
PrimitiveContextSummary,
core::result::Result<(), &'static str>,
),
CreateContextError,
> {
let state = local_state().map_err(|reason| CreateContextError::Failed { reason })?;
let mut guard = state.write().map_err(|_| CreateContextError::Failed {
reason: "context state lock poisoned".to_string(),
})?;
let context = guard.create(client_id, name, attributes);
let bind_result = guard.bind_session(context.id, session_id);
Ok((context, bind_result))
}
#[instrument(
level = "debug",
target = "bmux_contexts_plugin::lifecycle",
skip_all,
fields(
caller_client_id = ?caller_client_id,
context_id = tracing::field::Empty,
session_id = tracing::field::Empty,
),
)]
fn select_context_local(
caller: &(impl ServiceCaller + Sync),
caller_client_id: Option<::uuid::Uuid>,
selector: &WireSelector,
) -> Result<ContextAck, SelectContextError> {
let Some(context_selector) = selector.to_primitive() else {
return Err(SelectContextError::Denied {
reason: "selector must specify either id or name".to_string(),
});
};
let client_id = resolve_caller_client_id(caller, caller_client_id)
.map_err(|reason| SelectContextError::Denied { reason })?;
let (context, session_after_select) = mutate_state_select(client_id, &context_selector)?;
tracing::Span::current().record("context_id", tracing::field::display(context.id));
if let Some(session_id) = session_after_select {
tracing::Span::current().record("session_id", tracing::field::display(session_id.0));
}
if let Some(session_id) = session_after_select {
let _ = select_session_via_sessions_plugin(caller, session_id);
}
let _ = global_event_bus().emit(
&contexts_events::EVENT_KIND,
ContextEvent::Selected {
context_id: context.id,
},
);
if let Some(session_id) = session_after_select {
let _ = global_event_bus().emit(
&contexts_events::EVENT_KIND,
ContextEvent::SessionActiveContextChanged {
session_id: session_id.0,
context_id: context.id,
initiator_client_id: Some(client_id.0),
},
);
}
Ok(ContextAck {
id: context.id,
session_id: session_after_select.map(|s| s.0),
})
}
#[allow(clippy::significant_drop_tightening)]
fn mutate_state_select(
client_id: ClientId,
context_selector: &PrimitiveContextSelector,
) -> Result<(PrimitiveContextSummary, Option<SessionId>), SelectContextError> {
let state = local_state().map_err(|reason| SelectContextError::Denied { reason })?;
let mut guard = state.write().map_err(|_| SelectContextError::Denied {
reason: "context state lock poisoned".to_string(),
})?;
let context = guard
.select_for_client(client_id, context_selector)
.map_err(|reason| SelectContextError::Denied {
reason: reason.to_string(),
})?;
let session_id = guard.current_session_for_client(client_id);
Ok((context, session_id))
}
#[instrument(
level = "debug",
target = "bmux_contexts_plugin::lifecycle",
skip_all,
fields(
caller_client_id = ?caller_client_id,
force,
removed_id = tracing::field::Empty,
replacement_context_id = tracing::field::Empty,
replacement_session_id = tracing::field::Empty,
),
)]
fn close_context_local(
caller: &(impl ServiceCaller + Sync),
caller_client_id: Option<::uuid::Uuid>,
selector: &WireSelector,
force: bool,
) -> Result<ContextAck, CloseContextError> {
let Some(context_selector) = selector.to_primitive() else {
return Err(CloseContextError::Failed {
reason: "selector must specify either id or name".to_string(),
});
};
let client_id = resolve_caller_client_id(caller, caller_client_id)
.map_err(|reason| CloseContextError::Failed { reason })?;
let CloseOutcome {
removed_id,
bound_session_id,
replacement,
} = mutate_state_close(client_id, &context_selector, force)?;
tracing::Span::current().record("removed_id", tracing::field::display(removed_id));
if let Some((replacement_context_id, replacement_session_id)) = replacement {
tracing::Span::current().record(
"replacement_context_id",
tracing::field::display(replacement_context_id),
);
if let Some(session_id) = replacement_session_id {
tracing::Span::current().record(
"replacement_session_id",
tracing::field::display(session_id.0),
);
}
}
if let Some(session_id) = bound_session_id {
let _ = kill_session_via_sessions_plugin(caller, session_id);
}
let _ = global_event_bus().emit(
&contexts_events::EVENT_KIND,
ContextEvent::Closed {
context_id: removed_id,
},
);
if let Some((replacement_context_id, replacement_session_id)) = replacement {
if let Some(session_id) = replacement_session_id
&& let Err(reason) = select_session_via_sessions_plugin(caller, session_id)
{
let _ = caller.log_write(&bmux_plugin_sdk::LogWriteRequest {
level: bmux_plugin_sdk::LogWriteLevel::Warn,
message: format!(
"contexts.close_context: failed to select sibling session (context_id={replacement_context_id} session_id={}): {reason}",
session_id.0,
),
target: Some("bmux.contexts".to_string()),
});
}
let _ = global_event_bus().emit(
&contexts_events::EVENT_KIND,
ContextEvent::Selected {
context_id: replacement_context_id,
},
);
if let Some(session_id) = replacement_session_id {
let _ = global_event_bus().emit(
&contexts_events::EVENT_KIND,
ContextEvent::SessionActiveContextChanged {
session_id: session_id.0,
context_id: replacement_context_id,
initiator_client_id: Some(client_id.0),
},
);
}
}
Ok(ContextAck {
id: removed_id,
session_id: bound_session_id.map(|s| s.0),
})
}
struct CloseOutcome {
removed_id: ::uuid::Uuid,
bound_session_id: Option<SessionId>,
replacement: Option<(::uuid::Uuid, Option<SessionId>)>,
}
fn validate_context_rename_name(name: &str) -> Result<String, RenameContextError> {
let trimmed = name.trim();
if trimmed.is_empty() {
return Err(RenameContextError::InvalidName {
reason: "name must not be empty".to_string(),
});
}
Ok(trimmed.to_string())
}
#[instrument(
level = "debug",
target = "bmux_contexts_plugin::lifecycle",
skip_all,
fields(
context_id = tracing::field::Empty,
session_id = tracing::field::Empty,
name = %name,
),
)]
fn rename_context_local(
caller: &(impl ServiceCaller + Sync),
selector: &WireSelector,
name: &str,
) -> Result<ContextAck, RenameContextError> {
let Some(context_selector) = selector.to_primitive() else {
return Err(RenameContextError::Denied {
reason: "selector must specify either id or name".to_string(),
});
};
let name = validate_context_rename_name(name)?;
let RenameContextMutation {
context_id,
old_name,
session_id,
} = mutate_state_rename(&context_selector, name.clone())?;
tracing::Span::current().record("context_id", tracing::field::display(context_id));
if let Some(session_id) = session_id {
tracing::Span::current().record("session_id", tracing::field::display(session_id.0));
if let Err(reason) = rename_session_via_sessions_plugin(caller, session_id, name.clone()) {
rollback_context_rename(context_id, old_name);
return Err(RenameContextError::Failed { reason });
}
}
let _ = global_event_bus().emit(
&contexts_events::EVENT_KIND,
ContextEvent::Renamed { context_id, name },
);
Ok(ContextAck {
id: context_id,
session_id: session_id.map(|id| id.0),
})
}
struct RenameContextMutation {
context_id: Uuid,
old_name: Option<String>,
session_id: Option<SessionId>,
}
#[allow(clippy::significant_drop_tightening)]
fn mutate_state_rename(
context_selector: &PrimitiveContextSelector,
name: String,
) -> Result<RenameContextMutation, RenameContextError> {
let state = local_state().map_err(|reason| RenameContextError::Failed { reason })?;
let mut guard = state.write().map_err(|_| RenameContextError::Failed {
reason: "context state lock poisoned".to_string(),
})?;
let context_id = guard
.resolve_id(context_selector)
.map_err(|_| RenameContextError::NotFound)?;
let Some(context) = guard.contexts.get_mut(&context_id) else {
return Err(RenameContextError::NotFound);
};
let old_name = context.name.clone();
context.name = Some(name);
let session_id = guard.session_by_context.get(&context_id).copied();
Ok(RenameContextMutation {
context_id,
old_name,
session_id,
})
}
fn rollback_context_rename(context_id: Uuid, old_name: Option<String>) {
let Ok(state) = local_state() else {
return;
};
let Ok(mut guard) = state.write() else {
return;
};
if let Some(context) = guard.contexts.get_mut(&context_id) {
context.name = old_name;
}
}
fn rename_session_via_sessions_plugin(
caller: &(impl ServiceCaller + Sync),
session_id: SessionId,
name: String,
) -> Result<(), String> {
let mut client = dispatch_client(caller);
let selector = SessionSelector {
id: Some(session_id.0),
name: None,
};
let result = bmux_plugin::block_on_typed_dispatch(sessions_commands::client::rename_session(
&mut client,
selector,
name,
))
.map_err(|err| format!("sessions-commands/rename-session failed: {err}"))?;
result.map(|_| ()).map_err(|err| match err {
RenameSessionError::NotFound => "session not found".to_string(),
RenameSessionError::InvalidName { reason } | RenameSessionError::Failed { reason } => {
reason
}
})
}
#[allow(clippy::significant_drop_tightening)]
fn mutate_state_close(
client_id: ClientId,
context_selector: &PrimitiveContextSelector,
force: bool,
) -> Result<CloseOutcome, CloseContextError> {
let state = local_state().map_err(|reason| CloseContextError::Failed { reason })?;
let mut guard = state.write().map_err(|_| CloseContextError::Failed {
reason: "context state lock poisoned".to_string(),
})?;
let (removed_id, bound_session_id) = guard
.close(client_id, context_selector, force)
.map_err(|_reason| CloseContextError::NotFound)?;
let replacement = guard
.selected_by_client
.get(&client_id)
.copied()
.map(|context_id| {
let session_id = guard.session_by_context.get(&context_id).copied();
(context_id, session_id)
});
Ok(CloseOutcome {
removed_id,
bound_session_id,
replacement,
})
}
fn create_session_via_sessions_plugin(
caller: &(impl ServiceCaller + Sync),
name: Option<String>,
) -> Result<SessionId, CreateContextError> {
use bmux_sessions_plugin_api::sessions_commands::{self, NewSessionError};
let mut client = dispatch_client(caller);
let result = bmux_plugin::block_on_typed_dispatch(sessions_commands::client::new_session(
&mut client,
name,
))
.map_err(|err| CreateContextError::Failed {
reason: format!("sessions-commands:new-session failed: {err}"),
})?;
match result {
Ok(ack) => Ok(SessionId(ack.id)),
Err(NewSessionError::InvalidName { reason }) => {
Err(CreateContextError::InvalidName { reason })
}
Err(NewSessionError::Failed { reason }) => Err(CreateContextError::Failed { reason }),
}
}
fn kill_session_via_sessions_plugin(
caller: &(impl ServiceCaller + Sync),
session_id: SessionId,
) -> Result<(), String> {
use bmux_sessions_plugin_api::sessions_commands::{self, KillSessionError};
use bmux_sessions_plugin_api::sessions_state::SessionSelector;
let mut client = dispatch_client(caller);
let result = bmux_plugin::block_on_typed_dispatch(sessions_commands::client::kill_session(
&mut client,
SessionSelector {
id: Some(session_id.0),
name: None,
},
false,
))
.map_err(|err| format!("sessions-commands:kill-session failed: {err}"))?;
match result {
Ok(_) | Err(KillSessionError::NotFound) => Ok(()),
Err(KillSessionError::Failed { reason }) => Err(reason),
}
}
fn select_session_via_sessions_plugin(
caller: &(impl ServiceCaller + Sync),
session_id: SessionId,
) -> Result<(), String> {
use bmux_sessions_plugin_api::sessions_commands::{self, SelectSessionError};
use bmux_sessions_plugin_api::sessions_state::SessionSelector;
let mut client = dispatch_client(caller);
let result = bmux_plugin::block_on_typed_dispatch(sessions_commands::client::select_session(
&mut client,
SessionSelector {
id: Some(session_id.0),
name: None,
},
))
.map_err(|err| format!("sessions-commands:select-session failed: {err}"))?;
match result {
Ok(_) | Err(SelectSessionError::NotFound) => Ok(()),
Err(SelectSessionError::Denied { reason }) => Err(reason),
}
}
pub struct ContextsStateHandle {
caller: Arc<TypedServiceCaller>,
}
impl ContextsStateHandle {
const fn new(caller: Arc<TypedServiceCaller>) -> Self {
Self { caller }
}
}
impl ContextsStateService for ContextsStateHandle {
fn list_contexts<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = Vec<ContextSummary>> + Send + 'a>> {
Box::pin(async move { list_contexts_local(self.caller.as_ref()).unwrap_or_default() })
}
fn get_context<'a>(
&'a self,
selector: StateContextSelector,
) -> Pin<
Box<
dyn Future<Output = std::result::Result<ContextSummary, ContextQueryError>> + Send + 'a,
>,
> {
Box::pin(async move {
let wire = WireSelector {
id: selector.id,
name: selector.name,
};
match get_context_local(self.caller.as_ref(), &wire) {
Ok(result) => result,
Err(reason) => Err(ContextQueryError::InvalidSelector { reason }),
}
})
}
fn current_context<'a>(
&'a self,
) -> Pin<Box<dyn Future<Output = Option<ContextSummary>> + Send + 'a>> {
Box::pin(async move {
current_context_local(self.caller.as_ref(), None)
.ok()
.flatten()
})
}
}
pub struct ContextsCommandsHandle {
caller: Arc<TypedServiceCaller>,
}
impl ContextsCommandsHandle {
const fn new(caller: Arc<TypedServiceCaller>) -> Self {
Self { caller }
}
}
impl ContextsCommandsService for ContextsCommandsHandle {
fn create_context<'a>(
&'a self,
name: Option<String>,
attributes: BTreeMap<String, String>,
) -> Pin<
Box<dyn Future<Output = std::result::Result<ContextAck, CreateContextError>> + Send + 'a>,
> {
Box::pin(async move { create_context_local(self.caller.as_ref(), None, name, attributes) })
}
fn select_context<'a>(
&'a self,
selector: StateContextSelector,
) -> Pin<
Box<dyn Future<Output = std::result::Result<ContextAck, SelectContextError>> + Send + 'a>,
> {
Box::pin(async move {
let wire = WireSelector {
id: selector.id,
name: selector.name,
};
select_context_local(self.caller.as_ref(), None, &wire)
})
}
fn close_context<'a>(
&'a self,
selector: StateContextSelector,
force: bool,
) -> Pin<Box<dyn Future<Output = std::result::Result<ContextAck, CloseContextError>> + Send + 'a>>
{
Box::pin(async move {
let wire = WireSelector {
id: selector.id,
name: selector.name,
};
close_context_local(self.caller.as_ref(), None, &wire, force)
})
}
fn rename_context<'a>(
&'a self,
selector: StateContextSelector,
name: String,
) -> Pin<
Box<dyn Future<Output = std::result::Result<ContextAck, RenameContextError>> + Send + 'a>,
> {
Box::pin(async move {
let wire = WireSelector {
id: selector.id,
name: selector.name,
};
rename_context_local(self.caller.as_ref(), &wire, &name)
})
}
}
fn ipc_summary_to_typed(summary: PrimitiveContextSummary) -> ContextSummary {
ContextSummary {
id: summary.id,
name: summary.name,
attributes: summary.attributes,
}
}
bmux_plugin_sdk::export_plugin!(ContextsPlugin, include_str!("../plugin.toml"));
#[cfg(test)]
mod tests {
use super::*;
use bmux_plugin_sdk::{ServiceKind as SdkServiceKind, encode_service_message};
use std::collections::BTreeMap;
#[test]
fn remove_contexts_for_session_clears_mapping_and_reselects_client() {
let client_id = ClientId::new();
let mut context_state = ContextState::default();
let first = context_state.create(client_id, Some("first".to_string()), BTreeMap::new());
let first_session_id = SessionId::new();
context_state
.bind_session(first.id, first_session_id)
.expect("first context should bind to session");
let second = context_state.create(client_id, Some("second".to_string()), BTreeMap::new());
let second_session_id = SessionId::new();
context_state
.bind_session(second.id, second_session_id)
.expect("second context should bind to session");
let _ = context_state
.select_for_client(client_id, &PrimitiveContextSelector::ById(first.id))
.expect("selecting first context should succeed");
let removed = context_state.remove_contexts_for_session(first_session_id);
assert_eq!(removed, vec![first.id]);
assert!(
context_state
.context_for_session(first_session_id)
.is_none()
);
assert_eq!(
context_state
.current_for_client(client_id)
.map(|context| context.id),
Some(second.id)
);
assert_eq!(
context_state.current_session_for_client(client_id),
Some(second_session_id)
);
}
#[test]
fn rename_context_updates_display_name() {
let client_id = ClientId::new();
let mut context_state = ContextState::default();
let context = context_state.create(client_id, Some("old".to_string()), BTreeMap::new());
let renamed = context_state
.rename(
&PrimitiveContextSelector::ById(context.id),
"new".to_string(),
)
.expect("rename should succeed");
assert_eq!(renamed.name.as_deref(), Some("new"));
assert_eq!(
context_state
.contexts
.get(&context.id)
.and_then(|context| context.name.as_deref()),
Some("new")
);
}
#[tokio::test]
async fn close_active_context_promotes_most_recent_active_context() {
let client_id = ClientId::new();
let mut context_state = ContextState::default();
let first = context_state.create(client_id, Some("first".to_string()), BTreeMap::new());
let first_id = first.id;
context_state
.bind_session(first_id, SessionId::new())
.expect("first context should bind to session");
let second = context_state.create(client_id, Some("second".to_string()), BTreeMap::new());
let second_id = second.id;
context_state
.bind_session(second_id, SessionId::new())
.expect("second context should bind to session");
let _ = context_state
.select_for_client(client_id, &PrimitiveContextSelector::ById(first_id))
.expect("selecting first context should succeed");
let (closed_id, _closed_session) = context_state
.close(client_id, &PrimitiveContextSelector::ById(first_id), true)
.expect("closing first context should succeed");
assert_eq!(closed_id, first_id);
let current = context_state
.current_for_client(client_id)
.expect("current context should exist after close");
assert_eq!(current.id, second_id);
}
fn install_contexts_test_router(
fixed_client_id: Uuid,
) -> bmux_plugin::test_support::TestServiceRouterGuard {
use bmux_plugin::test_support::{TestServiceRouter, install_test_service_router};
let router: TestServiceRouter = std::sync::Arc::new(
move |_caller_plugin,
_caller_client,
_capability,
_kind,
interface,
operation,
_payload| {
match (interface, operation) {
("sessions-commands", "new-session") => {
let ack: std::result::Result<
bmux_sessions_plugin_api::sessions_commands::SessionAck,
bmux_sessions_plugin_api::sessions_commands::NewSessionError,
> = Ok(bmux_sessions_plugin_api::sessions_commands::SessionAck {
id: Uuid::new_v4(),
});
encode_service_message(&ack)
}
("sessions-commands", "select-session") => {
let ack: std::result::Result<
bmux_sessions_plugin_api::sessions_commands::SessionAck,
bmux_sessions_plugin_api::sessions_commands::SelectSessionError,
> = Ok(bmux_sessions_plugin_api::sessions_commands::SessionAck {
id: Uuid::new_v4(),
});
encode_service_message(&ack)
}
("sessions-commands", "kill-session") => {
let ack: std::result::Result<
bmux_sessions_plugin_api::sessions_commands::SessionAck,
bmux_sessions_plugin_api::sessions_commands::KillSessionError,
> = Ok(bmux_sessions_plugin_api::sessions_commands::SessionAck {
id: Uuid::new_v4(),
});
encode_service_message(&ack)
}
("clients-state", "current-client") => {
let summary: std::result::Result<
bmux_clients_plugin_api::clients_state::ClientSummary,
bmux_clients_plugin_api::clients_state::ClientQueryError,
> = Ok(bmux_clients_plugin_api::clients_state::ClientSummary {
id: fixed_client_id,
selected_session_id: None,
selected_context_id: None,
following_client_id: None,
following_global: false,
});
encode_service_message(&summary)
}
("logging-command/v1", "write") => encode_service_message(&()),
_ => Err(bmux_plugin_sdk::PluginError::UnsupportedHostOperation {
operation: "contexts_test_router",
}),
}
},
);
install_test_service_router(router)
}
fn test_service_context(caller_client_id: Uuid) -> bmux_plugin_sdk::NativeServiceContext {
bmux_plugin_sdk::NativeServiceContext {
plugin_id: "bmux.contexts".to_string(),
request: bmux_plugin_sdk::ServiceRequest {
caller_plugin_id: "bmux.windows".to_string(),
service: bmux_plugin_sdk::RegisteredService {
capability: bmux_plugin_sdk::HostScope::new("bmux.contexts.write")
.expect("capability should parse"),
kind: SdkServiceKind::Command,
interface_id: "contexts-commands".to_string(),
provider: bmux_plugin_sdk::ProviderId::Plugin("bmux.contexts".to_string()),
},
operation: "create-context".to_string(),
payload: Vec::new(),
},
required_capabilities: vec![
"bmux.contexts.read".to_string(),
"bmux.contexts.write".to_string(),
"bmux.clients.read".to_string(),
"bmux.sessions.write".to_string(),
"bmux.logs.write".to_string(),
],
provided_capabilities: vec!["bmux.contexts.read".to_string()],
services: Vec::new(),
available_capabilities: vec![
"bmux.contexts.read".to_string(),
"bmux.contexts.write".to_string(),
"bmux.clients.read".to_string(),
"bmux.sessions.write".to_string(),
"bmux.logs.write".to_string(),
],
enabled_plugins: vec!["bmux.contexts".to_string()],
plugin_search_roots: Vec::new(),
host: bmux_plugin_sdk::HostMetadata {
product_name: "bmux".to_string(),
product_version: env!("CARGO_PKG_VERSION").to_string(),
plugin_api_version: bmux_plugin_sdk::ApiVersion::new(1, 0),
plugin_abi_version: bmux_plugin_sdk::ApiVersion::new(1, 0),
},
connection: bmux_plugin_sdk::HostConnectionInfo {
config_dir: "/config".to_string(),
config_dir_candidates: vec!["/config".to_string()],
runtime_dir: "/runtime".to_string(),
data_dir: "/data".to_string(),
state_dir: "/state".to_string(),
},
settings: None,
plugin_settings_map: BTreeMap::new(),
caller_client_id: Some(caller_client_id),
host_kernel_bridge: None,
}
}
fn subscribe_events() -> std::sync::mpsc::Receiver<ContextEvent> {
let (tx, rx) = std::sync::mpsc::channel();
let mut bus_rx = bmux_plugin::global_event_bus()
.subscribe::<ContextEvent>(&contexts_events::EVENT_KIND)
.expect("event channel should be registered");
std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("subscriber runtime should build");
runtime.block_on(async move {
while let Ok(event) = bus_rx.recv().await {
if tx.send((*event).clone()).is_err() {
return;
}
}
});
});
rx
}
fn drain_events_after_short_wait(
rx: &std::sync::mpsc::Receiver<ContextEvent>,
) -> Vec<ContextEvent> {
std::thread::sleep(std::time::Duration::from_millis(50));
let mut out = Vec::new();
while let Ok(event) = rx.try_recv() {
out.push(event);
}
out
}
#[test]
fn create_context_emits_created_selected_and_session_active_change_in_order() {
let state_handle = std::sync::Arc::new(std::sync::RwLock::new(ContextState::default()));
let _existing = global_plugin_state_registry().register::<ContextState>(&state_handle);
bmux_plugin::global_event_bus()
.register_channel::<ContextEvent>(contexts_events::EVENT_KIND);
let client_id = Uuid::new_v4();
let _router_guard = install_contexts_test_router(client_id);
let events_rx = subscribe_events();
let ctx = test_service_context(client_id);
let ack = create_context_local(
&ctx,
Some(client_id),
Some("test-context".to_string()),
BTreeMap::new(),
)
.expect("create-context should succeed");
let emitted = drain_events_after_short_wait(&events_rx);
assert!(
emitted.len() >= 3,
"expected at least 3 events, got {emitted:?}"
);
let context_id = ack.id;
let positions: Vec<(usize, &ContextEvent)> = emitted
.iter()
.enumerate()
.filter(|(_, ev)| match ev {
ContextEvent::Created { context_id: id, .. }
| ContextEvent::Selected { context_id: id }
| ContextEvent::Renamed { context_id: id, .. }
| ContextEvent::SessionActiveContextChanged { context_id: id, .. } => {
*id == context_id
}
ContextEvent::Closed { .. } => false,
})
.collect();
assert_eq!(
positions.len(),
3,
"expected Created+Selected+SessionActiveContextChanged for {context_id}, got {emitted:?}"
);
assert!(
matches!(positions[0].1, ContextEvent::Created { .. }),
"first event should be Created, got {:?}",
positions[0].1
);
assert!(
matches!(positions[1].1, ContextEvent::Selected { .. }),
"second event should be Selected, got {:?}",
positions[1].1
);
let ContextEvent::SessionActiveContextChanged {
initiator_client_id,
..
} = positions[2].1
else {
panic!(
"third event should be SessionActiveContextChanged, got {:?}",
positions[2].1
);
};
assert_eq!(
*initiator_client_id,
Some(client_id),
"SessionActiveContextChanged must carry the initiating client id",
);
}
}