pub struct Connection<T, I = ()>where
T: AsyncRead + AsyncWrite,{
pub request_queue: RequestQueue<I>,
/* private fields */
}Expand description
A bidirectional LSP connection with split sender/receiver and request tracking.
Connection is the primary type for LSP communication. It provides:
send(): Send outbound messages (non-blocking, channel-based)receiver_mut(): A stream for receiving inbound messagesrequest_queue: Tracking for pending requests
At construction, a background drain task is spawned to forward messages from
an internal channel to the underlying transport. This means send()
never blocks on I/O — it only enqueues the message.
§Type Parameters
T: The underlying I/O stream type (AsyncRead + AsyncWrite)I: Metadata type for incoming requests (default:())
§Examples
§Basic construction
use lsp_server_tokio::Connection;
let (stream, _) = tokio::io::duplex(4096);
// Simple connection with unit metadata types
let conn: Connection<_, ()> = Connection::new(stream);§With custom metadata types
use lsp_server_tokio::Connection;
let (stream, _) = tokio::io::duplex(4096);
// Connection tracking method names for incoming, JSON values for outgoing
let conn: Connection<_, String> = Connection::new(stream);Fields§
§request_queue: RequestQueue<I>Request queue for tracking pending incoming and outgoing requests.
request_queue.incoming: Track requests you’ve received and need to respond torequest_queue.outgoing: Track requests you’ve sent and are awaiting responses for
Implementations§
Source§impl<T, I> Connection<T, I>
impl<T, I> Connection<T, I>
Sourcepub fn new(io: T) -> Self
pub fn new(io: T) -> Self
Creates a new connection from an async I/O stream.
This constructor wraps the stream with crate::LspCodec for Content-Length
framing, splits it into sender/receiver halves, and spawns a background
drain task for outbound messages.
§Arguments
io- Any async I/O stream implementingAsyncRead + AsyncWrite + Unpin + Send + 'static
§Example
use lsp_server_tokio::Connection;
let (stream, _) = tokio::io::duplex(4096);
let conn: Connection<_, ()> = Connection::new(stream);Sourcepub fn from_transport(transport: Transport<T>) -> Self
pub fn from_transport(transport: Transport<T>) -> Self
Creates a new connection from an existing transport.
This is useful when you already have a Transport and want to
upgrade it to a full Connection with request tracking.
§Arguments
transport- An existing LSP transport
§Example
use lsp_server_tokio::{transport, Connection};
let (stream, _) = tokio::io::duplex(4096);
let transport = transport(stream);
let conn: Connection<_, ()> = Connection::from_transport(transport);Sourcepub fn with_request_queue(io: T, request_queue: RequestQueue<I>) -> Self
pub fn with_request_queue(io: T, request_queue: RequestQueue<I>) -> Self
Creates a new connection with a pre-existing request queue.
This constructor allows you to provide your own RequestQueue, which
is useful for:
- Testing with pre-populated request state
- Migrating state between connections
- Using custom metadata types
§Arguments
io- Any async I/O stream implementingAsyncRead + AsyncWriterequest_queue- A pre-existing request queue
§Example
use lsp_server_tokio::{Connection, RequestQueue};
let (stream, _) = tokio::io::duplex(4096);
// Create a queue with some pre-registered requests
use tokio_util::sync::CancellationToken;
let mut queue: RequestQueue<u32> = RequestQueue::new();
let token = CancellationToken::new();
queue.incoming.register(1.into(), 100, token);
let conn = Connection::with_request_queue(stream, queue);
assert!(conn.request_queue.incoming.is_pending(&1.into()));Sourcepub fn lifecycle_state(&self) -> LifecycleState
pub fn lifecycle_state(&self) -> LifecycleState
Returns the current lifecycle state.
Sourcepub fn shutdown_token(&self) -> CancellationToken
pub fn shutdown_token(&self) -> CancellationToken
Returns a token that is cancelled when shutdown is requested.
Use this to gracefully stop background tasks when the server is shutting down.
§Example
use lsp_server_tokio::Connection;
use tokio_util::sync::CancellationToken;
let (stream, _) = tokio::io::duplex(4096);
let conn: Connection<_, ()> = Connection::new(stream);
let token = conn.shutdown_token();
tokio::spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => {
// Clean up and exit
break;
}
// ... other work ...
}
}
});Sourcepub fn on_shutdown(&self) -> impl Future<Output = ()> + '_
pub fn on_shutdown(&self) -> impl Future<Output = ()> + '_
Returns a future that completes when shutdown is requested.
This is equivalent to self.shutdown_token().cancelled() but more
convenient for simple use cases.
Source§impl<T, I> Connection<T, I>where
T: AsyncRead + AsyncWrite,
impl<T, I> Connection<T, I>where
T: AsyncRead + AsyncWrite,
Sourcepub fn receiver_mut(&mut self) -> &mut MessageStream<T>
pub fn receiver_mut(&mut self) -> &mut MessageStream<T>
Returns a mutable reference to the inbound message stream.
Use with StreamExt::next() to receive
messages from the peer.
§Example
use futures::StreamExt;
use lsp_server_tokio::Connection;
let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);
// Poll the next inbound message
// let msg = conn.receiver_mut().next().await;Sourcepub fn into_receiver(self) -> MessageStream<T>
pub fn into_receiver(self) -> MessageStream<T>
Consumes the connection and returns the inbound message stream.
This is useful when you need to move the stream into a separate task.
§Example
use futures::StreamExt;
use lsp_server_tokio::Connection;
let (stream, _) = tokio::io::duplex(4096);
let conn: Connection<_, ()> = Connection::new(stream);
let mut receiver = conn.into_receiver();
// tokio::spawn(async move { while let Some(msg) = receiver.next().await { ... } });Sourcepub fn send(&self, message: Message) -> Result<(), Error>
pub fn send(&self, message: Message) -> Result<(), Error>
Sends a message through the connection.
Messages are forwarded to the background drain task which writes them to the underlying transport. This method never blocks on I/O — it only enqueues the message on an unbounded channel.
§Errors
Returns an error if the background drain task has exited (connection closed).
§Example
use lsp_server_tokio::{Connection, Message, Request};
let (client_stream, _server_stream) = tokio::io::duplex(4096);
let conn: Connection<_, ()> = Connection::new(client_stream);
let request = Message::Request(Request::new(1, "test", None));
conn.send(request).unwrap();Source§impl<T, I> Connection<T, I>
impl<T, I> Connection<T, I>
Sourcepub fn route(&mut self, message: Message) -> IncomingMessage
pub fn route(&mut self, message: Message) -> IncomingMessage
Routes an incoming message, delivering responses to pending outgoing requests.
Call this method for each message received from the transport to classify it and automatically deliver responses to their corresponding outgoing request receivers.
For IncomingMessage::Request variants, the request
is automatically registered with a CancellationToken that is:
- Cancelled when
$/cancelRequestis received for this request ID - Cancelled when the connection shuts down
§Returns
IncomingMessage::Request- Handle the request and send a responseIncomingMessage::Notification- Handle the notificationIncomingMessage::ResponseRouted- Response was delivered to awaiting receiverIncomingMessage::ResponseUnknown- No pending request for this response ID
§Example
use lsp_server_tokio::{Connection, Message, IncomingMessage, Response};
use futures::StreamExt;
let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);
while let Some(Ok(msg)) = conn.receiver_mut().next().await {
match conn.route(msg) {
IncomingMessage::Request(req, token) => {
// Handle request with cooperative cancellation
println!("Request: {}", req.method);
// Use token.cancelled().await in select! for cancellation
// After handling, call conn.request_queue.incoming.complete(&req.id)
}
IncomingMessage::Notification(notif) => {
// Handle notification
println!("Notification: {}", notif.method);
}
IncomingMessage::CancelHandled => {
// `$/cancelRequest` was handled by route()
}
IncomingMessage::ResponseRouted => {
// Response delivered to awaiting task
}
IncomingMessage::ResponseUnknown(resp) => {
// Log unexpected response
eprintln!("Unknown response: {:?}", resp.id);
}
_ => {}
}
}Sourcepub fn cancel_incoming(&mut self, id: impl Into<RequestId>) -> bool
pub fn cancel_incoming(&mut self, id: impl Into<RequestId>) -> bool
Cancels an incoming request by request ID.
This is a convenience method that cancels the CancellationToken for a registered
incoming request. Use this when you receive a $/cancelRequest notification.
§Arguments
id- The request ID to cancel
§Returns
true if the request was found and cancelled, false if not found.
§Example
use lsp_server_tokio::Connection;
let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);
// After receiving $/cancelRequest for ID 42:
let was_cancelled = conn.cancel_incoming(42);Sourcepub fn client_sender(&mut self) -> ClientSender
pub fn client_sender(&mut self) -> ClientSender
Returns a cloneable ClientSender for sending requests, responses,
and notifications.
The first call initializes the response routing infrastructure. Subsequent calls return a new handle to the same underlying channel.
§Example
use lsp_server_tokio::{Connection, Response};
let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);
let sender1 = conn.client_sender();
let sender2 = conn.client_sender(); // returns another handleSource§impl<T, I> Connection<T, I>where
T: AsyncRead + AsyncWrite,
impl<T, I> Connection<T, I>where
T: AsyncRead + AsyncWrite,
Sourcepub fn cancel(&mut self, id: impl Into<RequestId>) -> Result<bool, Error>
pub fn cancel(&mut self, id: impl Into<RequestId>) -> Result<bool, Error>
Cancels an outgoing request by sending $/cancelRequest and removing it from the queue.
This method is for cancelling requests that this connection sent (outgoing requests).
For incoming request cancellation, use route() which handles
$/cancelRequest notifications automatically.
§Behavior
- Sends a
$/cancelRequestnotification with the given request ID - Removes the request from the outgoing queue (the awaiting receiver will get
RecvError)
§Returns
Ok(true)if the request was pending and cancelledOk(false)if the request was not found in the queueErrif sending the notification failed
§Errors
Returns an error if the cancellation notification cannot be sent to the peer.
§Example
use lsp_server_tokio::{Connection, Response};
use futures::SinkExt;
let (client_stream, server_stream) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(client_stream);
// Register an outgoing request
let rx = conn.request_queue.outgoing.register(42.into());
// Cancel it
let was_pending = conn.cancel(42).unwrap();
assert!(was_pending);
// The receiver will get an error
assert!(rx.await.is_err());Source§impl<T, I> Connection<T, I>where
T: AsyncRead + AsyncWrite,
impl<T, I> Connection<T, I>where
T: AsyncRead + AsyncWrite,
Sourcepub async fn initialize_start(
&mut self,
) -> Result<(RequestId, Value), ProtocolError>
pub async fn initialize_start( &mut self, ) -> Result<(RequestId, Value), ProtocolError>
Waits for the initialize request from the client.
This method blocks until an initialize request is received, rejecting
any other requests with ServerNotInitialized error and dropping
notifications (except exit which disconnects).
Returns the request ID and params for the initialize request.
You must call initialize_finish() with the
same ID to complete the handshake.
§Errors
ProtocolError::Disconnectedif the connection is closed or exit notification receivedProtocolError::Ioif an I/O error occurs
§Example
use lsp_server_tokio::Connection;
let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);
let (id, params) = conn.initialize_start().await.unwrap();
// Process params, build the full InitializeResult payload...
let capabilities = serde_json::json!({"textDocumentSync": 1});
let result = serde_json::json!({
"capabilities": capabilities,
"serverInfo": {"name": "my-server", "version": "0.1.0"}
});
conn.initialize_finish(id, result).await.unwrap();Sourcepub async fn initialize_finish(
&mut self,
id: RequestId,
initialize_result: Value,
) -> Result<(), ProtocolError>
pub async fn initialize_finish( &mut self, id: RequestId, initialize_result: Value, ) -> Result<(), ProtocolError>
Completes the initialization handshake.
Sends the InitializeResult response and waits for the initialized
notification from the client. After this returns Ok(()), the
connection is in Running state and ready for normal operation.
§Arguments
id- The request ID frominitialize_start()initialize_result- The fullInitializeResultvalue as JSON, sent verbatim as the response result. Must includecapabilitiesand may includeserverInfoor any other fields defined by the LSP spec.
§Errors
ProtocolError::Disconnectedif the connection is closedProtocolError::InitializeTimeoutif the client does not sendinitializedwithin 60 secondsProtocolError::Ioif an I/O error occurs
§Example
use lsp_server_tokio::Connection;
let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);
let (id, _params) = conn.initialize_start().await.unwrap();
let result = serde_json::json!({
"capabilities": {"textDocumentSync": 1},
"serverInfo": {"name": "my-server", "version": "0.1.0"}
});
conn.initialize_finish(id, result).await.unwrap();
assert!(conn.is_running());Sourcepub async fn initialize(
&mut self,
server_capabilities: Value,
) -> Result<Value, ProtocolError>
pub async fn initialize( &mut self, server_capabilities: Value, ) -> Result<Value, ProtocolError>
Performs complete LSP initialization handshake.
This is a convenience method that calls initialize_start()
followed by initialize_finish().
Returns the initialize params from the client.
Unlike initialize_finish() which takes a full
InitializeResult, this method takes just the server capabilities and
wraps them in {"capabilities": ...} automatically.
§Arguments
server_capabilities- The server’s capabilities as JSON (will be wrapped in{"capabilities": server_capabilities})
§Errors
ProtocolError::Disconnectedif the connection is closedProtocolError::Ioif an I/O error occurs
§Example
use lsp_server_tokio::Connection;
let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);
let capabilities = serde_json::json!({"textDocumentSync": 1});
let client_params = conn.initialize(capabilities).await.unwrap();
println!("Client capabilities: {}", client_params);Sourcepub fn handle_shutdown(&mut self, id: RequestId) -> Result<(), ProtocolError>
pub fn handle_shutdown(&mut self, id: RequestId) -> Result<(), ProtocolError>
Handles a shutdown request.
Transitions to ShuttingDown state, cancels the shutdown token,
and sends a null response. After this, only exit notification
should be received.
§Arguments
id- The request ID of the shutdown request
§Errors
ProtocolError::Ioif sending the response fails
§Example
use lsp_server_tokio::{Connection, Message};
use futures::StreamExt;
let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);
// ... after initialization ...
// When shutdown request is received:
// if let Message::Request(req) = msg && req.method == "shutdown" {
// conn.handle_shutdown(req.id).unwrap();
// assert!(conn.is_shutting_down());
// }Sourcepub fn handle_exit(&mut self) -> ExitCode
pub fn handle_exit(&mut self) -> ExitCode
Handles the exit notification.
Returns ExitCode::Success (exit code 0) if shutdown was received first,
or ExitCode::Error (exit code 1) if exit came without shutdown.
§Example
use lsp_server_tokio::{Connection, ExitCode, LifecycleState};
let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);
// Exit without shutdown - dirty exit
let code = conn.handle_exit();
assert_eq!(code, ExitCode::Error);Sourcepub fn is_running(&self) -> bool
pub fn is_running(&self) -> bool
Returns true if the connection is in Running state.
The connection is in Running state after successful initialization and before shutdown is requested.
Sourcepub fn is_shutting_down(&self) -> bool
pub fn is_shutting_down(&self) -> bool
Returns true if shutdown has been requested.
After shutdown, the server should only expect the exit notification.
Source§impl Connection<StdioTransport, ()>
impl Connection<StdioTransport, ()>
Sourcepub fn stdio() -> Self
pub fn stdio() -> Self
Creates a connection using stdin for reading and stdout for writing.
This is the typical constructor for LSP servers that communicate
over standard I/O. Uses unit types () for request queue metadata.
§Example
use futures::StreamExt;
use lsp_server_tokio::{Connection, Message};
let mut conn = Connection::stdio();
// Process incoming messages
while let Some(result) = conn.receiver_mut().next().await {
match result {
Ok(Message::Request(req)) => {
println!("Received request: {}", req.method);
// Handle request...
}
Ok(Message::Notification(notif)) => {
println!("Received notification: {}", notif.method);
}
Ok(Message::Response(resp)) => {
println!("Received response for: {:?}", resp.id);
}
Err(e) => {
eprintln!("Error: {}", e);
break;
}
}
}§Note
This constructor uses unit types () for both incoming and outgoing
request metadata. If you need custom metadata types, use
Connection::new() with a StdioTransport directly:
use lsp_server_tokio::{Connection, StdioTransport};
let conn: Connection<StdioTransport, String> = Connection::new(StdioTransport::new());