1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
//! This module houses the structures that are specific to the management of a CrystalOrb game
//! server.
//!
//! The [`Server`] structure is what you create, store, and update, analogous to the
//! [`Client`](crate::client::Client) structure. Unlike the [`Client`](crate::client::Client), the
//! [`Server`] does not need a "loading" stage, and can be used directly after creation.
//!
//! The interface shares some similarities with the the [`Ready`](crate::client::stage::Ready)
//! client stage.

use crate::{
    fixed_timestepper::{FixedTimestepper, TerminationCondition, TimeKeeper},
    network_resource::{Connection, ConnectionHandleType, NetworkResource},
    timestamp::{Timestamp, Timestamped},
    world::{InitializationType, Simulation, World},
    Config,
};
use tracing::{debug, error, trace, warn};

/// This is the top-level structure of CrystalOrb for your game server, analogous to the
/// [`Client`](crate::client::Client) for game clients. You create, store, and update this server
/// instance to run your game on the server side.
#[derive(Debug)]
pub struct Server<WorldType: World> {
    timekeeping_simulation: TimeKeeper<
        Simulation<WorldType, { InitializationType::PreInitialized }>,
        { TerminationCondition::LastUndershoot },
    >,
    seconds_since_last_snapshot: f64,
    config: Config,
}

impl<WorldType: World> Server<WorldType> {
    /// Constructs a new [`Server`]. This function requires a `seconds_since_startup` parameter to
    /// initialize the server's simulation timestamp.
    pub fn new(config: Config, seconds_since_startup: f64) -> Self {
        let mut server = Self {
            timekeeping_simulation: TimeKeeper::new(Simulation::new(), config.clone()),
            seconds_since_last_snapshot: 0.0,
            config,
        };

        let initial_timestamp =
            Timestamp::from_seconds(seconds_since_startup, server.config.timestep_seconds)
                - server.config.lag_compensation_frame_count();
        server
            .timekeeping_simulation
            .reset_last_completed_timestamp(initial_timestamp);

        server
    }

    /// The timestamp of the most recent frame that has completed its simulation.
    /// This is typically one less than [`Server::simulating_timestamp`].
    pub fn last_completed_timestamp(&self) -> Timestamp {
        self.timekeeping_simulation.last_completed_timestamp()
    }

    /// The timestamp of the frame that is *in the process* of being simulated.
    /// This is typically one more than [`Server::simulating_timestamp`].
    pub fn simulating_timestamp(&self) -> Timestamp {
        self.timekeeping_simulation.simulating_timestamp()
    }

    /// The timestamp that clients are supposed to be simulating at the moment (which should always
    /// be ahead of the server to compensate for the latency between the server and the clients).
    ///
    /// This is also the timestamp that gets attached to the command when you call
    /// [`Server::issue_command`].
    pub fn estimated_client_simulating_timestamp(&self) -> Timestamp {
        self.simulating_timestamp() + self.config.lag_compensation_frame_count()
    }

    /// The timestamp that clients have supposed to have completed simulating (which should always
    /// be ahead of the server to compensate for the latency between the server and the clients).
    pub fn estimated_client_last_completed_timestamp(&self) -> Timestamp {
        self.last_completed_timestamp() + self.config.lag_compensation_frame_count()
    }

    fn apply_validated_command<NetworkResourceType: NetworkResource<WorldType>>(
        &mut self,
        command: &Timestamped<WorldType::CommandType>,
        command_source: Option<ConnectionHandleType>,
        net: &mut NetworkResourceType,
    ) {
        debug!("Received command from {:?} - {:?}", command_source, command);

        // Apply this command to our world later on.
        self.timekeeping_simulation.schedule_command(command);

        // Relay command to every other client.
        for (handle, mut connection) in net.connections() {
            // Don't send it back to the same client if there is one.
            if let Some(source_handle) = command_source {
                if handle == source_handle {
                    continue;
                }
            }
            let result = connection.send(command.clone());
            connection.flush::<Timestamped<WorldType::CommandType>>();
            if let Some(message) = result {
                error!("Failed to relay command to [{}]: {:?}", handle, message);
            }
        }
    }

    fn receive_command<NetworkResourceType: NetworkResource<WorldType>>(
        &mut self,
        command: &Timestamped<WorldType::CommandType>,
        command_source: ConnectionHandleType,
        net: &mut NetworkResourceType,
    ) {
        if WorldType::command_is_valid(command.inner(), command_source)
        // TODO: Is it valid to validate the timestamps?
        // && command.timestamp() >= self.timekeeping_simulation.last_completed_timestamp()
        // && command.timestamp() <= self.estimated_client_simulating_timestamp()
        {
            self.apply_validated_command(command, Some(command_source), net);
        }
    }

    /// Issue a command from the server to the world. The command will be scheduled to the
    /// estimated client's current timestamp.
    pub fn issue_command<NetworkResourceType: NetworkResource<WorldType>>(
        &mut self,
        command: WorldType::CommandType,
        net: &mut NetworkResourceType,
    ) {
        self.apply_validated_command(
            &Timestamped::new(command, self.estimated_client_simulating_timestamp()),
            None,
            net,
        );
    }

    /// Iterate through the commands that are being kept around. This is intended to be for
    /// diagnostic purposes.
    pub fn buffered_commands(
        &self,
    ) -> impl Iterator<Item = (Timestamp, &Vec<WorldType::CommandType>)> {
        self.timekeeping_simulation.buffered_commands()
    }

    /// Get the current display state of the server's world.
    pub fn display_state(&self) -> Timestamped<WorldType::DisplayStateType> {
        self.timekeeping_simulation
            .display_state()
            .expect("Server simulation does not need initialization")
    }

    /// Perform the next update. You would typically call this in your game engine's update loop of
    /// some kind.
    pub fn update<NetworkResourceType: NetworkResource<WorldType>>(
        &mut self,
        delta_seconds: f64,
        seconds_since_startup: f64,
        net: &mut NetworkResourceType,
    ) {
        let positive_delta_seconds = delta_seconds.max(0.0);
        #[allow(clippy::float_cmp)]
        if delta_seconds != positive_delta_seconds {
            warn!(
                "Attempted to update client with a negative delta_seconds of {}. Clamping it to zero.",
                delta_seconds
            );
        }
        let mut new_commands = Vec::new();
        let mut clock_syncs = Vec::new();
        for (handle, mut connection) in net.connections() {
            while let Some(command) = connection.recv_command() {
                new_commands.push((command, handle));
            }
            while let Some(mut clock_sync_message) = connection.recv_clock_sync() {
                trace!("Replying to clock sync message. client_id: {}", handle);
                clock_sync_message.server_seconds_since_startup = seconds_since_startup;
                clock_sync_message.client_id = handle;
                clock_syncs.push((handle, clock_sync_message));
            }
        }
        for (command, command_source) in new_commands {
            self.receive_command(&command, command_source, &mut *net);
        }
        for (handle, clock_sync_message) in clock_syncs {
            net.send_message(handle, clock_sync_message)
                .expect("Connection from which clocksync request came from should still exist");
        }

        self.timekeeping_simulation
            .update(positive_delta_seconds, seconds_since_startup);

        self.seconds_since_last_snapshot += positive_delta_seconds;
        if self.seconds_since_last_snapshot > self.config.snapshot_send_period {
            trace!(
                "Broadcasting snapshot at timestamp: {:?} (note: drift error: {})",
                self.timekeeping_simulation.last_completed_timestamp(),
                self.timekeeping_simulation
                    .timestamp_drift_seconds(seconds_since_startup),
            );
            self.seconds_since_last_snapshot = 0.0;
            net.broadcast_message(self.timekeeping_simulation.last_completed_snapshot());
        }
    }
}