Skip to main content

Connection

Struct Connection 

Source
pub struct Connection<T, I = ()>
where T: AsyncRead + AsyncWrite,
{ pub request_queue: RequestQueue<I>, /* private fields */ }
Expand description

A bidirectional LSP connection with split sender/receiver and request tracking.

Connection is the primary type for LSP communication. It provides:

At construction, a background drain task is spawned to forward messages from an internal channel to the underlying transport. This means send() never blocks on I/O — it only enqueues the message.

§Type Parameters

  • T: The underlying I/O stream type (AsyncRead + AsyncWrite)
  • I: Metadata type for incoming requests (default: ())

§Examples

§Basic construction

use lsp_server_tokio::Connection;

let (stream, _) = tokio::io::duplex(4096);

// Simple connection with unit metadata types
let conn: Connection<_, ()> = Connection::new(stream);

§With custom metadata types

use lsp_server_tokio::Connection;

let (stream, _) = tokio::io::duplex(4096);

// Connection tracking method names for incoming, JSON values for outgoing
let conn: Connection<_, String> = Connection::new(stream);

Fields§

§request_queue: RequestQueue<I>

Request queue for tracking pending incoming and outgoing requests.

  • request_queue.incoming: Track requests you’ve received and need to respond to
  • request_queue.outgoing: Track requests you’ve sent and are awaiting responses for

Implementations§

Source§

impl<T, I> Connection<T, I>
where T: AsyncRead + AsyncWrite + Unpin + Send + 'static,

Source

pub fn new(io: T) -> Self

Creates a new connection from an async I/O stream.

This constructor wraps the stream with crate::LspCodec for Content-Length framing, splits it into sender/receiver halves, and spawns a background drain task for outbound messages.

§Arguments
  • io - Any async I/O stream implementing AsyncRead + AsyncWrite + Unpin + Send + 'static
§Example
use lsp_server_tokio::Connection;

let (stream, _) = tokio::io::duplex(4096);
let conn: Connection<_, ()> = Connection::new(stream);
Source

pub fn from_transport(transport: Transport<T>) -> Self

Creates a new connection from an existing transport.

This is useful when you already have a Transport and want to upgrade it to a full Connection with request tracking.

§Arguments
  • transport - An existing LSP transport
§Example
use lsp_server_tokio::{transport, Connection};

let (stream, _) = tokio::io::duplex(4096);
let transport = transport(stream);
let conn: Connection<_, ()> = Connection::from_transport(transport);
Source

pub fn with_request_queue(io: T, request_queue: RequestQueue<I>) -> Self

Creates a new connection with a pre-existing request queue.

This constructor allows you to provide your own RequestQueue, which is useful for:

  • Testing with pre-populated request state
  • Migrating state between connections
  • Using custom metadata types
§Arguments
  • io - Any async I/O stream implementing AsyncRead + AsyncWrite
  • request_queue - A pre-existing request queue
§Example
use lsp_server_tokio::{Connection, RequestQueue};

let (stream, _) = tokio::io::duplex(4096);

// Create a queue with some pre-registered requests
use tokio_util::sync::CancellationToken;
let mut queue: RequestQueue<u32> = RequestQueue::new();
let token = CancellationToken::new();
queue.incoming.register(1.into(), 100, token);

let conn = Connection::with_request_queue(stream, queue);
assert!(conn.request_queue.incoming.is_pending(&1.into()));
Source

pub fn lifecycle_state(&self) -> LifecycleState

Returns the current lifecycle state.

Source

pub fn shutdown_token(&self) -> CancellationToken

Returns a token that is cancelled when shutdown is requested.

Use this to gracefully stop background tasks when the server is shutting down.

§Example
use lsp_server_tokio::Connection;
use tokio_util::sync::CancellationToken;

let (stream, _) = tokio::io::duplex(4096);
let conn: Connection<_, ()> = Connection::new(stream);

let token = conn.shutdown_token();
tokio::spawn(async move {
    loop {
        tokio::select! {
            _ = token.cancelled() => {
                // Clean up and exit
                break;
            }
            // ... other work ...
        }
    }
});
Source

pub fn on_shutdown(&self) -> impl Future<Output = ()> + '_

Returns a future that completes when shutdown is requested.

This is equivalent to self.shutdown_token().cancelled() but more convenient for simple use cases.

Source§

impl<T, I> Connection<T, I>
where T: AsyncRead + AsyncWrite,

Source

pub fn receiver_mut(&mut self) -> &mut MessageStream<T>

Returns a mutable reference to the inbound message stream.

Use with StreamExt::next() to receive messages from the peer.

§Example
use futures::StreamExt;
use lsp_server_tokio::Connection;

let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);

// Poll the next inbound message
// let msg = conn.receiver_mut().next().await;
Source

pub fn into_receiver(self) -> MessageStream<T>

Consumes the connection and returns the inbound message stream.

This is useful when you need to move the stream into a separate task.

§Example
use futures::StreamExt;
use lsp_server_tokio::Connection;

let (stream, _) = tokio::io::duplex(4096);
let conn: Connection<_, ()> = Connection::new(stream);

let mut receiver = conn.into_receiver();
// tokio::spawn(async move { while let Some(msg) = receiver.next().await { ... } });
Source

pub fn send(&self, message: Message) -> Result<(), Error>

Sends a message through the connection.

Messages are forwarded to the background drain task which writes them to the underlying transport. This method never blocks on I/O — it only enqueues the message on an unbounded channel.

§Errors

Returns an error if the background drain task has exited (connection closed).

§Example
use lsp_server_tokio::{Connection, Message, Request};

let (client_stream, _server_stream) = tokio::io::duplex(4096);
let conn: Connection<_, ()> = Connection::new(client_stream);

let request = Message::Request(Request::new(1, "test", None));
conn.send(request).unwrap();
Source§

impl<T, I> Connection<T, I>
where T: AsyncRead + AsyncWrite, I: Default,

Source

pub fn route(&mut self, message: Message) -> IncomingMessage

Routes an incoming message, delivering responses to pending outgoing requests.

Call this method for each message received from the transport to classify it and automatically deliver responses to their corresponding outgoing request receivers.

For IncomingMessage::Request variants, the request is automatically registered with a CancellationToken that is:

  • Cancelled when $/cancelRequest is received for this request ID
  • Cancelled when the connection shuts down
§Returns
§Example
use lsp_server_tokio::{Connection, Message, IncomingMessage, Response};
use futures::StreamExt;

let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);

while let Some(Ok(msg)) = conn.receiver_mut().next().await {
    match conn.route(msg) {
        IncomingMessage::Request(req, token) => {
            // Handle request with cooperative cancellation
            println!("Request: {}", req.method);
            // Use token.cancelled().await in select! for cancellation
            // After handling, call conn.request_queue.incoming.complete(&req.id)
        }
        IncomingMessage::Notification(notif) => {
            // Handle notification
            println!("Notification: {}", notif.method);
        }
        IncomingMessage::CancelHandled => {
            // `$/cancelRequest` was handled by route()
        }
        IncomingMessage::ResponseRouted => {
            // Response delivered to awaiting task
        }
        IncomingMessage::ResponseUnknown(resp) => {
            // Log unexpected response
            eprintln!("Unknown response: {:?}", resp.id);
        }
        _ => {}
    }
}
Source

pub fn cancel_incoming(&mut self, id: impl Into<RequestId>) -> bool

Cancels an incoming request by request ID.

This is a convenience method that cancels the CancellationToken for a registered incoming request. Use this when you receive a $/cancelRequest notification.

§Arguments
  • id - The request ID to cancel
§Returns

true if the request was found and cancelled, false if not found.

§Example
use lsp_server_tokio::Connection;

let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);

// After receiving $/cancelRequest for ID 42:
let was_cancelled = conn.cancel_incoming(42);
Source

pub fn client_sender(&mut self) -> ClientSender

Returns a cloneable ClientSender for sending requests, responses, and notifications.

The first call initializes the response routing infrastructure. Subsequent calls return a new handle to the same underlying channel.

§Example
use lsp_server_tokio::{Connection, Response};

let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);

let sender1 = conn.client_sender();
let sender2 = conn.client_sender(); // returns another handle
Source§

impl<T, I> Connection<T, I>
where T: AsyncRead + AsyncWrite,

Source

pub fn cancel(&mut self, id: impl Into<RequestId>) -> Result<bool, Error>

Cancels an outgoing request by sending $/cancelRequest and removing it from the queue.

This method is for cancelling requests that this connection sent (outgoing requests). For incoming request cancellation, use route() which handles $/cancelRequest notifications automatically.

§Behavior
  1. Sends a $/cancelRequest notification with the given request ID
  2. Removes the request from the outgoing queue (the awaiting receiver will get RecvError)
§Returns
  • Ok(true) if the request was pending and cancelled
  • Ok(false) if the request was not found in the queue
  • Err if sending the notification failed
§Errors

Returns an error if the cancellation notification cannot be sent to the peer.

§Example
use lsp_server_tokio::{Connection, Response};
use futures::SinkExt;

let (client_stream, server_stream) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(client_stream);

// Register an outgoing request
let rx = conn.request_queue.outgoing.register(42.into());

// Cancel it
let was_pending = conn.cancel(42).unwrap();
assert!(was_pending);

// The receiver will get an error
assert!(rx.await.is_err());
Source§

impl<T, I> Connection<T, I>
where T: AsyncRead + AsyncWrite,

Source

pub async fn initialize_start( &mut self, ) -> Result<(RequestId, Value), ProtocolError>

Waits for the initialize request from the client.

This method blocks until an initialize request is received, rejecting any other requests with ServerNotInitialized error and dropping notifications (except exit which disconnects).

Returns the request ID and params for the initialize request. You must call initialize_finish() with the same ID to complete the handshake.

§Errors
§Example
use lsp_server_tokio::Connection;

let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);

let (id, params) = conn.initialize_start().await.unwrap();
// Process params, build the full InitializeResult payload...
let capabilities = serde_json::json!({"textDocumentSync": 1});
let result = serde_json::json!({
    "capabilities": capabilities,
    "serverInfo": {"name": "my-server", "version": "0.1.0"}
});
conn.initialize_finish(id, result).await.unwrap();
Source

pub async fn initialize_finish( &mut self, id: RequestId, initialize_result: Value, ) -> Result<(), ProtocolError>

Completes the initialization handshake.

Sends the InitializeResult response and waits for the initialized notification from the client. After this returns Ok(()), the connection is in Running state and ready for normal operation.

§Arguments
  • id - The request ID from initialize_start()
  • initialize_result - The full InitializeResult value as JSON, sent verbatim as the response result. Must include capabilities and may include serverInfo or any other fields defined by the LSP spec.
§Errors
§Example
use lsp_server_tokio::Connection;

let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);

let (id, _params) = conn.initialize_start().await.unwrap();
let result = serde_json::json!({
    "capabilities": {"textDocumentSync": 1},
    "serverInfo": {"name": "my-server", "version": "0.1.0"}
});
conn.initialize_finish(id, result).await.unwrap();

assert!(conn.is_running());
Source

pub async fn initialize( &mut self, server_capabilities: Value, ) -> Result<Value, ProtocolError>

Performs complete LSP initialization handshake.

This is a convenience method that calls initialize_start() followed by initialize_finish(). Returns the initialize params from the client.

Unlike initialize_finish() which takes a full InitializeResult, this method takes just the server capabilities and wraps them in {"capabilities": ...} automatically.

§Arguments
  • server_capabilities - The server’s capabilities as JSON (will be wrapped in {"capabilities": server_capabilities})
§Errors
§Example
use lsp_server_tokio::Connection;

let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);

let capabilities = serde_json::json!({"textDocumentSync": 1});
let client_params = conn.initialize(capabilities).await.unwrap();
println!("Client capabilities: {}", client_params);
Source

pub fn handle_shutdown(&mut self, id: RequestId) -> Result<(), ProtocolError>

Handles a shutdown request.

Transitions to ShuttingDown state, cancels the shutdown token, and sends a null response. After this, only exit notification should be received.

§Arguments
  • id - The request ID of the shutdown request
§Errors
§Example
use lsp_server_tokio::{Connection, Message};
use futures::StreamExt;

let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);

// ... after initialization ...
// When shutdown request is received:
// if let Message::Request(req) = msg && req.method == "shutdown" {
//     conn.handle_shutdown(req.id).unwrap();
//     assert!(conn.is_shutting_down());
// }
Source

pub fn handle_exit(&mut self) -> ExitCode

Handles the exit notification.

Returns ExitCode::Success (exit code 0) if shutdown was received first, or ExitCode::Error (exit code 1) if exit came without shutdown.

§Example
use lsp_server_tokio::{Connection, ExitCode, LifecycleState};

let (stream, _) = tokio::io::duplex(4096);
let mut conn: Connection<_, ()> = Connection::new(stream);

// Exit without shutdown - dirty exit
let code = conn.handle_exit();
assert_eq!(code, ExitCode::Error);
Source

pub fn is_running(&self) -> bool

Returns true if the connection is in Running state.

The connection is in Running state after successful initialization and before shutdown is requested.

Source

pub fn is_shutting_down(&self) -> bool

Returns true if shutdown has been requested.

After shutdown, the server should only expect the exit notification.

Source§

impl Connection<StdioTransport, ()>

Source

pub fn stdio() -> Self

Creates a connection using stdin for reading and stdout for writing.

This is the typical constructor for LSP servers that communicate over standard I/O. Uses unit types () for request queue metadata.

§Example
use futures::StreamExt;
use lsp_server_tokio::{Connection, Message};

let mut conn = Connection::stdio();

// Process incoming messages
while let Some(result) = conn.receiver_mut().next().await {
    match result {
        Ok(Message::Request(req)) => {
            println!("Received request: {}", req.method);
            // Handle request...
        }
        Ok(Message::Notification(notif)) => {
            println!("Received notification: {}", notif.method);
        }
        Ok(Message::Response(resp)) => {
            println!("Received response for: {:?}", resp.id);
        }
        Err(e) => {
            eprintln!("Error: {}", e);
            break;
        }
    }
}
§Note

This constructor uses unit types () for both incoming and outgoing request metadata. If you need custom metadata types, use Connection::new() with a StdioTransport directly:

use lsp_server_tokio::{Connection, StdioTransport};

let conn: Connection<StdioTransport, String> = Connection::new(StdioTransport::new());

Auto Trait Implementations§

§

impl<T, I> Freeze for Connection<T, I>

§

impl<T, I = ()> !RefUnwindSafe for Connection<T, I>

§

impl<T, I> Send for Connection<T, I>
where T: Send, I: Send,

§

impl<T, I> Sync for Connection<T, I>
where T: Send, I: Sync,

§

impl<T, I> Unpin for Connection<T, I>
where I: Unpin,

§

impl<T, I> UnsafeUnpin for Connection<T, I>

§

impl<T, I = ()> !UnwindSafe for Connection<T, I>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.